?????? Processing-time temporal join is not supported yet
watermarkeventtimewatermarkkey ---- ??: "user-zh" https://issues.apache.org/jira/browse/FLINK-19830 <https://issues.apache.org/jira/browse/FLINK-19830gt; Leonard gt; ?? 2021??6??2317:03??op <520075...@qq.com.INVALIDgt; ?? gt; gt;nbsp; Processing-time temporal join is not supported yet.
????: Processing-time temporal join is not supported yet
Join ?? Join ?? ?? Join ?? Join Join ?? join ??, ?? watermark jiangshan0...@163.com op ?? 2021-06-23 17:03 user-zh ?? Processing-time temporal join is not supported yet hi??kakatemporal join?? org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet. sql?? create view visioned_table as select user_id, event from (select user_id, event, row_number() over(partition by user_id order by event_time desc) as rn from kafka_table1 )ta where rn=1; select t1.*,t2.* from mvp_rtdwd_event_app_quit t1 join visioned_table FOR SYSTEM_TIME AS OF t1.proc_time AS t2 on t1.user_id=t2.user_id where t1.user_id is not null
Re: Processing-time temporal join is not supported yet
会保留维表状态的,靠watermark清理过期数据。 祝好 Leonard > 在 2021年6月23日,19:20,op <520075...@qq.com.INVALID> 写道: > > 谢谢,Event time temporal join > 会保存temporal每个的key的最新状态吗,官网文档说跟两边watermark有关,每太看明白。。。 > > > > > --原始邮件-- > 发件人: > "user-zh" > > 发送时间:2021年6月23日(星期三) 下午5:40 > 收件人:"user-zh" > 主题:Re: Processing-time temporal join is not supported yet > > > > Hi, > > Flink SQL 目前支持 Event time temporal join 任意表/视图,还不支持 Processing-time > temporal join 任意表/视图(支持Processing-time join > 实现了LookupTableSource的表)。 > > Processing-time temporal join 任意表目前不支持的原因主要是语义问题,具体来说: > 在Processing time关联时,Flink SQL 层面还没比较好的机制保证维表加载完后再关联。比如如用来做维表流的kafka中有 1000万 > 条数据,但目前没有办法实现将这 > 1000万条先记录全部加载完后主流过来的数据再去关联,在作业启动阶段,主流的数据预期能够关联上的数据可能因为维表还未加载完成而关联不上。 > > 可以参考下 https://issues.apache.org/jira/browse/FLINK-19830 > <https://issues.apache.org/jira/browse/FLINK-19830; > > 祝好 > Leonard > > > > 在 2021年6月23日,17:03,op <520075...@qq.com.INVALID 写道: > > Processing-time temporal join is not supported yet.
?????? Processing-time temporal join is not supported yet
??Event time temporal join ??temporal??key??watermark?? ---- ??: "user-zh" https://issues.apache.org/jira/browse/FLINK-19830 <https://issues.apache.org/jira/browse/FLINK-19830; Leonard ?? 2021??6??2317:03??op <520075...@qq.com.INVALID ?? Processing-time temporal join is not supported yet.
Re: Processing-time temporal join is not supported yet
Hi, Flink SQL 目前支持 Event time temporal join 任意表/视图,还不支持 Processing-time temporal join 任意表/视图(支持Processing-time join 实现了LookupTableSource的表)。 Processing-time temporal join 任意表目前不支持的原因主要是语义问题,具体来说: 在Processing time关联时,Flink SQL 层面还没比较好的机制保证维表加载完后再关联。比如如用来做维表流的kafka中有 1000万 条数据,但目前没有办法实现将这 1000万条先记录全部加载完后主流过来的数据再去关联,在作业启动阶段,主流的数据预期能够关联上的数据可能因为维表还未加载完成而关联不上。 可以参考下 https://issues.apache.org/jira/browse/FLINK-19830 <https://issues.apache.org/jira/browse/FLINK-19830> 祝好 Leonard > 在 2021年6月23日,17:03,op <520075...@qq.com.INVALID> 写道: > > Processing-time temporal join is not supported yet.
Processing-time temporal join is not supported yet
hi??kakatemporal join?? org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet. sql?? create view visioned_table as select user_id, event from (select user_id, event, row_number() over(partition by user_id order by event_time desc) as rn from kafka_table1 )ta where rn=1; select t1.*,t2.* from mvp_rtdwd_event_app_quit t1 join visioned_table FOR SYSTEM_TIME AS OF t1.proc_time AS t2 on t1.user_id=t2.user_id where t1.user_id is not null
Re: Watermarks in Event Time Temporal Join
Thanks for your example, Maciej I can explain more about the design. > Let's have events. > S1, id1, v1, 1 > S2, id1, v2, 1 > > Nothing is happening as none of the streams have reached the watermark. > Now let's add > S2, id2, v2, 101 > This should trigger join for id1 because we have all the knowledge to > perform this join (we know that the watermark for id1 record was > reached). Base on this example, if the left stream events are S1, id1, v1, 1 S1, id1, v2, 101 S1, id1, v3, 102 S1, id1, v4, 99 // assume the we need watermark as` ts - 3` For out-of-order data like late event v4(99) should we ginored or not because the versioned table’s watermark is only base on versioned table’s data instead of both sides, and it cannot represents the of left stream, now we use left stream’s watermark for out-of-order data. And continually base on your example, assume the right stream events are S2, id2, v1, 101 S2, id3, v1, 101 S2, id3, v2, 102 S2, id3, v3, 103 S2, id3, v4, 104 How we clean the old version data e.g. id3(v1~v3)? If you clean them only base on versioned table watermark(e.g. versioned table watermark is 105), the data (id3, 101), (id3, 102) from left stream data cannot find correct version, right? Now the left stream watermark is used to clean up the outdated data and ensure the every row in left stream can find correct version. Best, Leonard
Re: Watermarks in Event Time Temporal Join
Hi Leonard, Let's assume we have two streams. S1 - id, value1, ts1 with watermark = ts1 - 1 S2 - id, value2, ts2 with watermark = ts2 - 1 Then we have following interval join SELECT id, value1, value2, ts1 FROM S1 JOIN S2 ON S1.id = S2.id and ts1 between ts2 - 1 and ts2 Let's have events. stream, id, value, ts S1, id1, v1, 1 S2, id1, v2, 1 For this events and internal join Flink will emit an event in the output stream: id1, v1, v2, 1 Despite the fact the watermark for both streams is not reached. Now similar situation for Event Time Temporal Join SELECT id, value1, value2, ts1 FROM S1 JOIN S2 FOR SYSTEM_TIME AS OF S1.ts1 ON S1.id = S2.id Let's have events. S1, id1, v1, 1 S2, id1, v2, 1 Nothing is happening as none of the streams have reached the watermark. Now let's add S2, id2, v2, 101 This should trigger join for id1 because we have all the knowledge to perform this join (we know that the watermark for id1 record was reached). Unfortunately to trigger join on id1 we also need a watermark on S1 side and I think this behaviour is wrong. I hope I explained everything correctly. Regards, Maciek wt., 27 kwi 2021 o 08:58 Leonard Xu napisał(a): > > Hello, Maciej > > I agree the watermark should pass on versioned table side, because > this is the only way to know which version of record should be used. > But if we mimics behaviour of interval join then main stream watermark > could be skipped. > > > IIRC, rowtime interval join requires the watermark on both sides, and the > watermark > will be used to clean up the outdated data and advance the data progress both > in rowtime interval join and rowtime temporal join. > > Best, > Leonard > -- Maciek Bryński
Re: Watermarks in Event Time Temporal Join
Hello, Maciej > I agree the watermark should pass on versioned table side, because > this is the only way to know which version of record should be used. > But if we mimics behaviour of interval join then main stream watermark > could be skipped. IIRC, rowtime interval join requires the watermark on both sides, and the watermark will be used to clean up the outdated data and advance the data progress both in rowtime interval join and rowtime temporal join. Best, Leonard
Re: Watermarks in Event Time Temporal Join
Hi Shengkai, Thanks for the answer. The question is do we need to determine if an event in the main stream is late. Let's look at interval join - event is emitted as soon as there is a match between left and right stream. I agree the watermark should pass on versioned table side, because this is the only way to know which version of record should be used. But if we mimics behaviour of interval join then main stream watermark could be skipped. Regards, Maciek pon., 26 kwi 2021 o 06:14 Shengkai Fang napisał(a): > > Hi, maverick. > > The watermark is used to determine the message is late or early. If we only > use the watermark on versioned table side, we have no means to determine > whether the event in the main stream is ready to emit. > > Best, > Shengkai > > maverick 于2021年4月26日周一 上午2:31写道: >> >> Hi, >> I'm curious why Event Time Temporal Join needs watermarks from both sides to >> perform join. >> >> Shouldn't watermark on versioned table side be enough to perform join ? >> >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ -- Maciek Bryński
Re: Watermarks in Event Time Temporal Join
Hi, maverick. The watermark is used to determine the message is late or early. If we only use the watermark on versioned table side, we have no means to determine whether the event in the main stream is ready to emit. Best, Shengkai maverick 于2021年4月26日周一 上午2:31写道: > Hi, > I'm curious why Event Time Temporal Join needs watermarks from both sides > to > perform join. > > Shouldn't watermark on versioned table side be enough to perform join ? > > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Watermarks in Event Time Temporal Join
Hi, I'm curious why Event Time Temporal Join needs watermarks from both sides to perform join. Shouldn't watermark on versioned table side be enough to perform join ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Time Temporal Join
Hi Timo, Apologies for the late response. I somehow seem to have missed your reply. I do want the join to be "time-based" since I need to perform a tumble grouping operation on top of the join. I tried setting the watermark strategy to `R` - INTERVAL '0.001' SECONDS, that didn't help either. Note that we have a custom connector to an internal storage engine. The connector implements ScanTableSource interface with SupportsWatermarkPushDown ability. Would the watermark strategy in the table schema matter in that case? I changed the query to the following to simplify further - select F.C0, F.C1, F.R, D.C0, D.C1, D.R from F JOIN D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1 I still do not see any output from the pipeline. The overall logs I see from the connecter is the following - Emit D.D row=+I(0,0,1970-01-01T00:00)@time=0 --> ctx.collectWithTimestamp(row_, rowtime); Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0 Emit D.D row=+I(1,1,1970-01-01T00:00)@time=0 Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000 Emit D.D row=+I(2,2,1970-01-01T00:00)@time=0 Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000 Emit D.F row=+I(3,3,1970-01-01T00:00:03)@time=3000 Emit D.D row=+I(3,3,1970-01-01T00:00)@time=0 Emit D.F row=+I(4,4,1970-01-01T00:00:04)@time=4000 Emit D.F wm=4000 ---> ctx.emitWatermark(new Watermark(wm)); Emit D.D wm=0 Now, if I change the rowtime of table D to 1s instead of 0, I get one row as output. Emit D.D row=+I(0,0,1970-01-01T00:00:01)@time=1000 Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0 Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000 Emit D.D row=+I(1,1,1970-01-01T00:00:01)@time=1000 Emit D.D row=+I(2,2,1970-01-01T00:00:01)@time=1000 Emit D.D row=+I(3,3,1970-01-01T00:00:01)@time=1000 Emit D.F wm=1000 Emit D.D wm=1000 reply: (1, "1", 1000, 1, "1", 1000) The next row streamed from F which should join with a row emitted from D does not emit any output - Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000 Emit D.F wm=2000 NO REPLY My understanding of temporal joins is that the latest row from D should be picked for joining rows from F. Is my expectation that the (2, 2, 2s) in F join with (2, 2, 1s) row in D wrong? Regards, Satyam On Tue, Mar 16, 2021 at 5:54 AM Timo Walther wrote: > Hi Satyam, > > first of all your initial join query can also work, you just need to > make sure that no time attribute is in the SELECT clause. As the > exception indicates, you need to cast all time attributes to TIMESTAMP. > The reason for this is some major design issue that is also explained > here where a time attribute must not be in the output of a regular join: > > https://stackoverflow.com/a/64500296/806430 > > However, since you would like to perform the join "time-based" either > interval join or temporal join might solve your use cases. > > In your case I guess the watermark strategy of D is the problem. Are you > sure the result is: > > > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 > > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 > > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 > > Emit D watermark=0 > > and not: > > > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 > > Emit D watermark=0 > > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 > > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 > > Or maybe the watermark is even dropped. Could you try to use a watermark > strategy with > > `R` - INTERVAL '0.001' SECONDS > > I hope this helps. > > Regards, > Timo > > > > On 16.03.21 04:37, Satyam Shekhar wrote: > > Hello folks, > > > > I would love to hear back your feedback on this. > > > > Regards, > > Satyam > > > > On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar > <mailto:satyamshek...@gmail.com>> wrote: > > > > Hello folks, > > > > I am looking to enrich rows from an unbounded streaming table by > > joining it with a bounded static table while preserving rowtime for > > the streaming table. For example, let's consider table two tables F > > and D, where F is unbounded and D is bounded. The schema for the two > > tables is the following - > > > > F: > > |-- C0: BIGINT > > |-- C1: STRING > > |-- R: TIMESTAMP(3) **rowtime** > > |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS > > > > D: > > |-- C0: BIGINT > > |-- C1: STRING NOT NULL > > > > I'd like to run the following query on this schema - > > > > select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) > > from F join D ON F.C1 = D.C1 > > group by
Re: Time Temporal Join
Hi Satyam, first of all your initial join query can also work, you just need to make sure that no time attribute is in the SELECT clause. As the exception indicates, you need to cast all time attributes to TIMESTAMP. The reason for this is some major design issue that is also explained here where a time attribute must not be in the output of a regular join: https://stackoverflow.com/a/64500296/806430 However, since you would like to perform the join "time-based" either interval join or temporal join might solve your use cases. In your case I guess the watermark strategy of D is the problem. Are you sure the result is: > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 > Emit D watermark=0 and not: > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 > Emit D watermark=0 > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 Or maybe the watermark is even dropped. Could you try to use a watermark strategy with `R` - INTERVAL '0.001' SECONDS I hope this helps. Regards, Timo On 16.03.21 04:37, Satyam Shekhar wrote: Hello folks, I would love to hear back your feedback on this. Regards, Satyam On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar <mailto:satyamshek...@gmail.com>> wrote: Hello folks, I am looking to enrich rows from an unbounded streaming table by joining it with a bounded static table while preserving rowtime for the streaming table. For example, let's consider table two tables F and D, where F is unbounded and D is bounded. The schema for the two tables is the following - F: |-- C0: BIGINT |-- C1: STRING |-- R: TIMESTAMP(3) **rowtime** |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS D: |-- C0: BIGINT |-- C1: STRING NOT NULL I'd like to run the following query on this schema - select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) from F join D ON F.C1 = D.C1 group by D.C1, tumble(F.R, interval '1' second) However, I run into the following error while running the above query - "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before." My understanding reading the docs is that Time Temporal Join is meant to solve this problem. So I model table D as the following - D: |-- C0: BIGINT |-- C1: STRING NOT NULL |-- R: TIMESTAMP(3) |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1) With column D.R always set to 0 and modify the query as follows - select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1 group by D.C1, tumble(F.R, interval '1' second) The above query runs but does not return any result. I have the following data in D initially - Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 Emit D watermark=0 And F streams the following rows - Emit F row=+I(0,"0",1970-01-01T00:00)@time=0 Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000 Emit F watermark=1000 I expect that two rows in F will join with matching rows (on C1) in D and produce some output. But I do not see anything in the output. So I have the following questions - 1. Is time temporal join the correct tool to solve this problem? 2. What could be the reason for not getting any output rows in the result? Thanks, Satyam
Re: Time Temporal Join
Hello folks, I would love to hear back your feedback on this. Regards, Satyam On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar wrote: > Hello folks, > > I am looking to enrich rows from an unbounded streaming table by > joining it with a bounded static table while preserving rowtime for the > streaming table. For example, let's consider table two tables F and D, > where F is unbounded and D is bounded. The schema for the two tables is the > following - > > F: > |-- C0: BIGINT > |-- C1: STRING > |-- R: TIMESTAMP(3) **rowtime** > |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS > > D: > |-- C0: BIGINT > |-- C1: STRING NOT NULL > > I'd like to run the following query on this schema - > > select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) > from F join D ON F.C1 = D.C1 > group by D.C1, tumble(F.R, interval '1' second) > > However, I run into the following error while running the above query - > > "Rowtime attributes must not be in the input rows of a regular join. As a > workaround you can cast the time attributes of input tables to TIMESTAMP > before." > > My understanding reading the docs is that Time Temporal Join is meant to > solve this problem. So I model table D as the following - > > D: > |-- C0: BIGINT > |-- C1: STRING NOT NULL > |-- R: TIMESTAMP(3) > |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS > |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1) > > With column D.R always set to 0 and modify the query as follows - > > select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) > from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1 > group by D.C1, tumble(F.R, interval '1' second) > > The above query runs but does not return any result. I have the following > data in D initially - > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 > Emit D watermark=0 > > And F streams the following rows - > Emit F row=+I(0,"0",1970-01-01T00:00)@time=0 > Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000 > Emit F watermark=1000 > > I expect that two rows in F will join with matching rows (on C1) in D and > produce some output. But I do not see anything in the output. > > So I have the following questions - > > 1. Is time temporal join the correct tool to solve this problem? > 2. What could be the reason for not getting any output rows in the result? > > Thanks, > Satyam > >
Time Temporal Join
Hello folks, I am looking to enrich rows from an unbounded streaming table by joining it with a bounded static table while preserving rowtime for the streaming table. For example, let's consider table two tables F and D, where F is unbounded and D is bounded. The schema for the two tables is the following - F: |-- C0: BIGINT |-- C1: STRING |-- R: TIMESTAMP(3) **rowtime** |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS D: |-- C0: BIGINT |-- C1: STRING NOT NULL I'd like to run the following query on this schema - select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) from F join D ON F.C1 = D.C1 group by D.C1, tumble(F.R, interval '1' second) However, I run into the following error while running the above query - "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before." My understanding reading the docs is that Time Temporal Join is meant to solve this problem. So I model table D as the following - D: |-- C0: BIGINT |-- C1: STRING NOT NULL |-- R: TIMESTAMP(3) |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1) With column D.R always set to 0 and modify the query as follows - select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1 group by D.C1, tumble(F.R, interval '1' second) The above query runs but does not return any result. I have the following data in D initially - Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 Emit D watermark=0 And F streams the following rows - Emit F row=+I(0,"0",1970-01-01T00:00)@time=0 Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000 Emit F watermark=1000 I expect that two rows in F will join with matching rows (on C1) in D and produce some output. But I do not see anything in the output. So I have the following questions - 1. Is time temporal join the correct tool to solve this problem? 2. What could be the reason for not getting any output rows in the result? Thanks, Satyam
Re: Processing-time temporal join is not supported yet.
Sorry, I mean you can create a UDTF where you can cache data from your files and then enrich your stream with LATERAL TABLE grammar, BTW, you can reference FileSystemLookupFunction.java[1]. If we plan to support Lookup for filesystem, we should use this function too. Best, Leonard [1] https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java > 在 2021年3月4日,19:26,eric hoffmann 写道: > > Thx Leonard, > by UDF you mean a custom table source on s3? > > Le jeu. 4 mars 2021 à 05:31, Leonard Xu <mailto:xbjt...@gmail.com>> a écrit : > Hi, Eric > >> what will be the best workaround to enrich stream of data from a kafka >> topics with statical data based on id? > Currently you can put your statical data in Hive/JDBC/HBase which supports > lookup the data in full table env as a workaround,. > You can also write a UDF which caches the s3 files that can be used to enrich > your stream data. > > Best, > Leonard > >> >> >> Le sam. 27 févr. 2021 à 05:15, Leonard Xu > <mailto:xbjt...@gmail.com>> a écrit : >> Hi, Eric >> >> Firstly FileSystemTableSource doe not implement LookupTableSource which >> means we cannot directly lookup a Filesystem table. >> >> In FLINK-19830, we plan to support Processing-time temporal join any >> table/views by lookup the data in join operator state which scanned from the >> filesystem table, but as the issue described: join processing for left >> stream doesn't wait for the complete snapshot of temporal table, this may >> mislead users in production environment. >> Eg: your s3 table has 1000 records, but the join operator does not know when >> all records has been arrived, the correlation maybe incorrect, thus we >> disable this feature. >> >> I think we can implement LookupTableSource for FileSystemTableSource >> currently, after that, we can directly lookup a Filesystem table, the >> implementation will be similar to Hive table where we cache all data of the >> files and then lookup the cache. Could you help create an JIRA ticket for >> this? >> >> >> Best, >> Leonard >> >> >>> 在 2021年2月26日,23:41,Matthias Pohl >> <mailto:matth...@ververica.com>> 写道: >>> >>> Hi Eric, >>> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the >>> thread. Maybe, he has a workaround for your case. >>> >>> Best, >>> Matthias >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-19830 >>> <https://issues.apache.org/jira/browse/FLINK-19830> >>> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann >> <mailto:sigfrid.hoffm...@gmail.com>> wrote: >>> Hello >>> Working with flink 1.12.1 i read in the doc that Processing-time temporal >>> join is supported for kv like join but when i try i get a: >>> >>> Exception in thread "main" org.apache.flink.table.api.TableException: >>> Processing-time temporal join is not supported yet. >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) >>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) >>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.sc
Re: Processing-time temporal join is not supported yet.
Thx Leonard, by UDF you mean a custom table source on s3? Le jeu. 4 mars 2021 à 05:31, Leonard Xu a écrit : > Hi, Eric > > what will be the best workaround to enrich stream of data from a kafka > topics with statical data based on id? > > Currently you can put your statical data in Hive/JDBC/HBase which supports > lookup the data in full table env as a workaround,. > You can also write a UDF which caches the s3 files that can be used to > enrich your stream data. > > Best, > Leonard > > > > Le sam. 27 févr. 2021 à 05:15, Leonard Xu a écrit : > >> Hi, Eric >> >> Firstly FileSystemTableSource doe not implement LookupTableSource which >> means we cannot directly lookup a Filesystem table. >> >> In FLINK-19830, we plan to support Processing-time temporal join any >> table/views by lookup the data in join operator state which scanned from >> the filesystem table, but as the issue described: join processing for >> left stream doesn't wait for the complete snapshot of temporal table, this >> may mislead users in production environment. >> Eg: your s3 table has 1000 records, but the join operator does not know >> when all records has been arrived, the correlation maybe incorrect, thus we >> disable this feature. >> >> I think we can implement LookupTableSource for FileSystemTableSource >> currently, after that, we can directly lookup a Filesystem table, the >> implementation will be similar to Hive table where we cache all data of the >> files and then lookup the cache. Could you help create an JIRA ticket for >> this? >> >> >> Best, >> Leonard >> >> >> 在 2021年2月26日,23:41,Matthias Pohl 写道: >> >> Hi Eric, >> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the >> thread. Maybe, he has a workaround for your case. >> >> Best, >> Matthias >> >> [1] https://issues.apache.org/jira/browse/FLINK-19830 >> >> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann < >> sigfrid.hoffm...@gmail.com> wrote: >> >>> Hello >>> Working with flink 1.12.1 i read in the doc that Processing-time >>> temporal join is supported for kv like join but when i try i get a: >>> >>> Exception in thread "main" org.apache.flink.table.api.TableException: >>> Processing-time temporal join is not supported yet. >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) >>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) >>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) >>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >>> at >>> org.apache.flink.table.planner.plan.no
Re: Processing-time temporal join is not supported yet.
Hi, Eric > what will be the best workaround to enrich stream of data from a kafka topics > with statical data based on id? Currently you can put your statical data in Hive/JDBC/HBase which supports lookup the data in full table env as a workaround,. You can also write a UDF which caches the s3 files that can be used to enrich your stream data. Best, Leonard > > > Le sam. 27 févr. 2021 à 05:15, Leonard Xu <mailto:xbjt...@gmail.com>> a écrit : > Hi, Eric > > Firstly FileSystemTableSource doe not implement LookupTableSource which means > we cannot directly lookup a Filesystem table. > > In FLINK-19830, we plan to support Processing-time temporal join any > table/views by lookup the data in join operator state which scanned from the > filesystem table, but as the issue described: join processing for left stream > doesn't wait for the complete snapshot of temporal table, this may mislead > users in production environment. > Eg: your s3 table has 1000 records, but the join operator does not know when > all records has been arrived, the correlation maybe incorrect, thus we > disable this feature. > > I think we can implement LookupTableSource for FileSystemTableSource > currently, after that, we can directly lookup a Filesystem table, the > implementation will be similar to Hive table where we cache all data of the > files and then lookup the cache. Could you help create an JIRA ticket for > this? > > > Best, > Leonard > > >> 在 2021年2月26日,23:41,Matthias Pohl > <mailto:matth...@ververica.com>> 写道: >> >> Hi Eric, >> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the >> thread. Maybe, he has a workaround for your case. >> >> Best, >> Matthias >> >> [1] https://issues.apache.org/jira/browse/FLINK-19830 >> <https://issues.apache.org/jira/browse/FLINK-19830> >> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann > <mailto:sigfrid.hoffm...@gmail.com>> wrote: >> Hello >> Working with flink 1.12.1 i read in the doc that Processing-time temporal >> join is supported for kv like join but when i try i get a: >> >> Exception in thread "main" org.apache.flink.table.api.TableException: >> Processing-time temporal join is not supported yet. >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) >> at >> org.
Re: Processing-time temporal join is not supported yet.
Hi Leonard, Thx for your reply, Not problem to help on the JIRA topic, In my situation, in a full sql env, what will be the best workaround to enrich stream of data from a kafka topics with statical data based on id? i know how to do t in stream. eric Le sam. 27 févr. 2021 à 05:15, Leonard Xu a écrit : > Hi, Eric > > Firstly FileSystemTableSource doe not implement LookupTableSource which > means we cannot directly lookup a Filesystem table. > > In FLINK-19830, we plan to support Processing-time temporal join any > table/views by lookup the data in join operator state which scanned from > the filesystem table, but as the issue described: join processing for > left stream doesn't wait for the complete snapshot of temporal table, this > may mislead users in production environment. > Eg: your s3 table has 1000 records, but the join operator does not know > when all records has been arrived, the correlation maybe incorrect, thus we > disable this feature. > > I think we can implement LookupTableSource for FileSystemTableSource > currently, after that, we can directly lookup a Filesystem table, the > implementation will be similar to Hive table where we cache all data of the > files and then lookup the cache. Could you help create an JIRA ticket for > this? > > > Best, > Leonard > > > 在 2021年2月26日,23:41,Matthias Pohl 写道: > > Hi Eric, > it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the > thread. Maybe, he has a workaround for your case. > > Best, > Matthias > > [1] https://issues.apache.org/jira/browse/FLINK-19830 > > On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann > wrote: > >> Hello >> Working with flink 1.12.1 i read in the doc that Processing-time temporal >> join is supported for kv like join but when i try i get a: >> >> Exception in thread "main" org.apache.flink.table.api.TableException: >> Processing-time temporal join is not supported yet. >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) >> >> my query: >> >> SELECT e.id >> , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime >> AS r ON >> e.id = r.id >> >> my s3 table: >> >> CREATE TABLE s3Table(id STRING, >> test STRING, PRIMARY KEY (id) NOT ENFORCED) >> WITH ('connector'='filesystem','path'='s3a://fs/','format'='json') >> >> my kafka table: >> >> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, >> proctime AS PROCTIME()) >> >> WITH >> ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'=' >> 127.0.0.1:9092','properties.group.id >> '='mygroup','format'='json','scan.startup.mode'='group-offsets', >> 'properties.enable.auto.commit'='false') >> >> > >
Re: Processing-time temporal join is not supported yet.
Hi, Eric Firstly FileSystemTableSource doe not implement LookupTableSource which means we cannot directly lookup a Filesystem table. In FLINK-19830, we plan to support Processing-time temporal join any table/views by lookup the data in join operator state which scanned from the filesystem table, but as the issue described: join processing for left stream doesn't wait for the complete snapshot of temporal table, this may mislead users in production environment. Eg: your s3 table has 1000 records, but the join operator does not know when all records has been arrived, the correlation maybe incorrect, thus we disable this feature. I think we can implement LookupTableSource for FileSystemTableSource currently, after that, we can directly lookup a Filesystem table, the implementation will be similar to Hive table where we cache all data of the files and then lookup the cache. Could you help create an JIRA ticket for this? Best, Leonard > 在 2021年2月26日,23:41,Matthias Pohl 写道: > > Hi Eric, > it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the > thread. Maybe, he has a workaround for your case. > > Best, > Matthias > > [1] https://issues.apache.org/jira/browse/FLINK-19830 > <https://issues.apache.org/jira/browse/FLINK-19830> > On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <mailto:sigfrid.hoffm...@gmail.com>> wrote: > Hello > Working with flink 1.12.1 i read in the doc that Processing-time temporal > join is supported for kv like join but when i try i get a: > > Exception in thread "main" org.apache.flink.table.api.TableException: > Processing-time temporal join is not supported yet. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) > at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) > > my query: > > SELECT e.id <http://e.id/>, r.test FROM kafkaTable as e JOIN s3Table FOR > SYSTEM_TIME AS OF e.proctime AS r ON e.id <http://e.id/> = r.id <http://r.id/> > > my s3 table: > > CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED) > WITH ('connector'='filesystem','path'='s3a://fs/','format'='json') > > my kafka table: > > CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS > PROCTIME()) > WITH > ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092 > <http://127.0.0.1:9092/>','properties.group.id > <http://properties.group.id/>'='mygroup','format'='json','scan.startup.mode'='group-offsets', > 'properties.enable.auto.commit'='false') > >
Re: Processing-time temporal join is not supported yet.
Hi Eric, it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the thread. Maybe, he has a workaround for your case. Best, Matthias [1] https://issues.apache.org/jira/browse/FLINK-19830 On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann wrote: > Hello > Working with flink 1.12.1 i read in the doc that Processing-time temporal > join is supported for kv like join but when i try i get a: > > Exception in thread "main" org.apache.flink.table.api.TableException: > Processing-time temporal join is not supported yet. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) > at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) > > my query: > > SELECT e.id > , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime > AS r ON > e.id = r.id > > my s3 table: > > CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED) > WITH ('connector'='filesystem','path'='s3a://fs/','format'='json') > > my kafka table: > > CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, > proctime AS PROCTIME()) > > WITH > ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'=' > 127.0.0.1:9092','properties.group.id > '='mygroup','format'='json','scan.startup.mode'='group-offsets', > 'properties.enable.auto.commit'='false') > >
Processing-time temporal join is not supported yet.
Hello Working with flink 1.12.1 i read in the doc that Processing-time temporal join is supported for kv like join but when i try i get a: Exception in thread "main" org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) my query: SELECT e.id , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime AS r ON e.id = r.id my s3 table: CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ('connector'='filesystem','path'='s3a://fs/','format'='json') my kafka table: CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS PROCTIME()) WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'=' 127.0.0.1:9092','properties.group.id '='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false')