?????? Processing-time temporal join is not supported yet

2021-06-23 Thread op
watermarkeventtimewatermarkkey




----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-19830 
<https://issues.apache.org/jira/browse/FLINK-19830gt; 
 
 
 Leonard
 
 
 
 gt; ?? 2021??6??2317:03??op <520075...@qq.com.INVALIDgt; 
??
 gt; 
 gt;nbsp; Processing-time temporal join is not supported yet.

????: Processing-time temporal join is not supported yet

2021-06-23 Thread jiangshan0...@163.com
 Join  
?? Join ?? 
?? Join ?? Join  Join 
??  join 
??,
 ??

 
watermark



jiangshan0...@163.com
 
 op
?? 2021-06-23 17:03
 user-zh
?? Processing-time temporal join is not supported yet
hi??kakatemporal join??
org.apache.flink.table.api.TableException: Processing-time temporal join is not 
supported yet.
 
sql??
 
 
create view visioned_table as
 select
 user_id,
 event
from
(select
  user_id,
  event,
  row_number() over(partition by user_id order by event_time desc) 
as rn
  from kafka_table1
  )ta where rn=1;
select
   t1.*,t2.*
  from mvp_rtdwd_event_app_quit t1
join visioned_table FOR SYSTEM_TIME AS OF t1.proc_time AS t2
 on t1.user_id=t2.user_id
 where t1.user_id is not null


Re: Processing-time temporal join is not supported yet

2021-06-23 Thread Leonard Xu
会保留维表状态的,靠watermark清理过期数据。

祝好
Leonard


> 在 2021年6月23日,19:20,op <520075...@qq.com.INVALID> 写道:
> 
> 谢谢,Event time temporal join 
> 会保存temporal每个的key的最新状态吗,官网文档说跟两边watermark有关,每太看明白。。。
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年6月23日(星期三) 下午5:40
> 收件人:"user-zh" 
> 主题:Re: Processing-time temporal join is not supported yet
> 
> 
> 
> Hi,
> 
> Flink SQL 目前支持 Event time temporal join 任意表/视图,还不支持 Processing-time 
> temporal join 任意表/视图(支持Processing-time join 
> 实现了LookupTableSource的表)。
> 
> Processing-time temporal join 任意表目前不支持的原因主要是语义问题,具体来说: 
> 在Processing time关联时,Flink SQL 层面还没比较好的机制保证维表加载完后再关联。比如如用来做维表流的kafka中有 1000万 
> 条数据,但目前没有办法实现将这 
> 1000万条先记录全部加载完后主流过来的数据再去关联,在作业启动阶段,主流的数据预期能够关联上的数据可能因为维表还未加载完成而关联不上。
> 
> 可以参考下 https://issues.apache.org/jira/browse/FLINK-19830 
> <https://issues.apache.org/jira/browse/FLINK-19830; 
> 
> 祝好
> Leonard
> 
> 
> 
>  在 2021年6月23日,17:03,op <520075...@qq.com.INVALID 写道:
>  
>  Processing-time temporal join is not supported yet.



?????? Processing-time temporal join is not supported yet

2021-06-23 Thread op
??Event time temporal join 
??temporal??key??watermark??




----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-19830 
<https://issues.apache.org/jira/browse/FLINK-19830; 


Leonard



 ?? 2021??6??2317:03??op <520075...@qq.com.INVALID ??
 
 Processing-time temporal join is not supported yet.

Re: Processing-time temporal join is not supported yet

2021-06-23 Thread Leonard Xu
Hi,

Flink SQL 目前支持 Event time  temporal join 任意表/视图,还不支持 Processing-time temporal  
join 任意表/视图(支持Processing-time  join 实现了LookupTableSource的表)。

Processing-time temporal  join 任意表目前不支持的原因主要是语义问题,具体来说:  在Processing 
time关联时,Flink SQL 层面还没比较好的机制保证维表加载完后再关联。比如如用来做维表流的kafka中有 1000万 条数据,但目前没有办法实现将这 
1000万条先记录全部加载完后主流过来的数据再去关联,在作业启动阶段,主流的数据预期能够关联上的数据可能因为维表还未加载完成而关联不上。

可以参考下 https://issues.apache.org/jira/browse/FLINK-19830 
<https://issues.apache.org/jira/browse/FLINK-19830> 

祝好
Leonard



> 在 2021年6月23日,17:03,op <520075...@qq.com.INVALID> 写道:
> 
>  Processing-time temporal join is not supported yet.



Processing-time temporal join is not supported yet

2021-06-23 Thread op
hi??kakatemporal join??
org.apache.flink.table.api.TableException: Processing-time temporal join is not 
supported yet.

sql??


create view visioned_table as
 select
 user_id,
 event
from
(select
  user_id,
  event,
  row_number() over(partition by user_id order by event_time desc) 
as rn
  from kafka_table1
  )ta where rn=1;

select
   t1.*,t2.*
  from mvp_rtdwd_event_app_quit t1
join visioned_table FOR SYSTEM_TIME AS OF t1.proc_time AS t2
 on t1.user_id=t2.user_id
 where t1.user_id is not null

Re: Watermarks in Event Time Temporal Join

2021-04-28 Thread Leonard Xu
Thanks for your example, Maciej

I can explain more about the design.
> Let's have events.
> S1, id1, v1, 1
> S2, id1, v2, 1
> 
> Nothing is happening as none of the streams have reached the watermark.
> Now let's add
> S2, id2, v2, 101
> This should trigger join for id1 because we have all the knowledge to
> perform this join (we know that the watermark for id1 record was
> reached).

Base on this example, if the left stream events are
S1, id1, v1, 1
S1, id1, v2, 101
S1, id1, v3, 102
S1, id1, v4, 99 // assume the we need watermark as` ts - 3`
For out-of-order data like late event v4(99) should we ginored or not because 
the versioned table’s watermark is only base on versioned table’s data instead 
of both sides,
and it cannot represents the  of left stream, now we use left stream’s 
watermark for out-of-order data.
 
And continually base on your example, assume the right stream events are 
S2, id2, v1, 101
S2, id3, v1, 101
S2, id3, v2, 102
S2, id3, v3, 103
S2, id3, v4, 104
How we clean the old version data e.g. id3(v1~v3)? If you clean them only base 
on versioned table watermark(e.g. versioned table watermark is 105), the
 data (id3, 101),  (id3, 102) from  left stream data cannot find correct 
version, right?
Now the left stream watermark is used to clean up the outdated data and ensure 
the every row in left stream can find correct version.

Best,
Leonard

Re: Watermarks in Event Time Temporal Join

2021-04-28 Thread Maciej Bryński
Hi Leonard,
Let's assume we have two streams.
S1 - id, value1, ts1 with watermark = ts1 - 1
S2 - id, value2, ts2 with watermark = ts2 - 1

Then we have following interval join
SELECT id, value1, value2, ts1 FROM S1 JOIN S2 ON S1.id = S2.id and
ts1 between ts2 - 1 and ts2

Let's have events.
stream, id, value, ts
S1, id1, v1, 1
S2, id1, v2, 1

For this events and internal join Flink will emit an event in the output stream:
id1, v1, v2, 1
Despite the fact the watermark for both streams is not reached.

Now similar situation for Event Time Temporal Join
SELECT id, value1, value2, ts1 FROM S1 JOIN S2 FOR SYSTEM_TIME AS OF
S1.ts1 ON S1.id = S2.id

Let's have events.
S1, id1, v1, 1
S2, id1, v2, 1

Nothing is happening as none of the streams have reached the watermark.
Now let's add
S2, id2, v2, 101
This should trigger join for id1 because we have all the knowledge to
perform this join (we know that the watermark for id1 record was
reached).
Unfortunately to trigger join on id1 we also need a watermark on S1
side and I think this behaviour is wrong.

I hope I explained everything correctly.

Regards,
Maciek

wt., 27 kwi 2021 o 08:58 Leonard Xu  napisał(a):
>
> Hello, Maciej
>
> I agree the watermark should pass on versioned table side, because
> this is the only way to know which version of record should be used.
> But if we mimics behaviour of interval join then main stream watermark
> could be skipped.
>
>
> IIRC, rowtime interval join requires the watermark on both sides, and the 
> watermark
> will be used to clean up the outdated data and advance the data progress both 
> in rowtime  interval join and rowtime temporal join.
>
> Best,
> Leonard
>


-- 
Maciek Bryński


Re: Watermarks in Event Time Temporal Join

2021-04-27 Thread Leonard Xu
Hello, Maciej
> I agree the watermark should pass on versioned table side, because
> this is the only way to know which version of record should be used.
> But if we mimics behaviour of interval join then main stream watermark
> could be skipped.

IIRC, rowtime interval join requires the watermark on both sides, and the 
watermark 
will be used to clean up the outdated data and advance the data progress both 
in rowtime  interval join and rowtime temporal join.

Best,
Leonard



Re: Watermarks in Event Time Temporal Join

2021-04-26 Thread Maciej Bryński
Hi Shengkai,
Thanks for the answer. The question is do we need to determine if an
event in the main stream is late.
Let's look at interval join - event is emitted as soon as there is a
match between left and right stream.
I agree the watermark should pass on versioned table side, because
this is the only way to know which version of record should be used.
But if we mimics behaviour of interval join then main stream watermark
could be skipped.

Regards,
Maciek

pon., 26 kwi 2021 o 06:14 Shengkai Fang  napisał(a):
>
> Hi, maverick.
>
> The watermark is used to determine the message is late or early. If we only 
> use the watermark on versioned table side, we have no means to determine 
> whether the event in the main stream is ready to emit.
>
> Best,
> Shengkai
>
> maverick  于2021年4月26日周一 上午2:31写道:
>>
>> Hi,
>> I'm curious why Event Time Temporal Join needs watermarks from both sides to
>> perform join.
>>
>> Shouldn't watermark on versioned table side be enough to perform join ?
>>
>>
>>
>>
>>
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



-- 
Maciek Bryński


Re: Watermarks in Event Time Temporal Join

2021-04-25 Thread Shengkai Fang
Hi, maverick.

The watermark is used to determine the message is late or early. If we only
use the watermark on versioned table side, we have no means to determine
whether the event in the main stream is ready to emit.

Best,
Shengkai

maverick  于2021年4月26日周一 上午2:31写道:

> Hi,
> I'm curious why Event Time Temporal Join needs watermarks from both sides
> to
> perform join.
>
> Shouldn't watermark on versioned table side be enough to perform join ?
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Watermarks in Event Time Temporal Join

2021-04-25 Thread maverick
Hi,
I'm curious why Event Time Temporal Join needs watermarks from both sides to
perform join.

Shouldn't watermark on versioned table side be enough to perform join ?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Time Temporal Join

2021-03-25 Thread Satyam Shekhar
Hi Timo,

Apologies for the late response. I somehow seem to have missed your reply.

I do want the join to be "time-based" since I need to perform a tumble
grouping operation on top of the join.

I tried setting the watermark strategy to `R` - INTERVAL '0.001' SECONDS,
that didn't help either.

Note that we have a custom connector to an internal storage engine. The
connector implements ScanTableSource interface with
SupportsWatermarkPushDown ability. Would the watermark strategy in the
table schema matter in that case? I changed the query to the following to
simplify further -

select F.C0, F.C1, F.R, D.C0, D.C1, D.R from F JOIN D FOR SYSTEM_TIME AS OF
F.R ON F.C1 = D.C1

I still do not see any output from the pipeline. The overall logs I see
from the connecter is the following -

Emit D.D row=+I(0,0,1970-01-01T00:00)@time=0  -->
ctx.collectWithTimestamp(row_,
rowtime);
Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0
Emit D.D row=+I(1,1,1970-01-01T00:00)@time=0
Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(2,2,1970-01-01T00:00)@time=0
Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000
Emit D.F row=+I(3,3,1970-01-01T00:00:03)@time=3000
Emit D.D row=+I(3,3,1970-01-01T00:00)@time=0
Emit D.F row=+I(4,4,1970-01-01T00:00:04)@time=4000
Emit D.F wm=4000  --->  ctx.emitWatermark(new Watermark(wm));
Emit D.D wm=0

Now, if I change the rowtime of table D to 1s instead of 0, I get one row
as output.

Emit D.D row=+I(0,0,1970-01-01T00:00:01)@time=1000
Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0
Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(2,2,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(3,3,1970-01-01T00:00:01)@time=1000
Emit D.F wm=1000
Emit D.D wm=1000

reply: (1, "1", 1000, 1, "1", 1000)

The next row streamed from F which should join with a row emitted from D
does not emit any output -

Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000
Emit D.F wm=2000
NO REPLY

My understanding of temporal joins is that the latest row from D should be
picked for joining rows from F.  Is my expectation that the (2, 2, 2s) in F
join with (2, 2, 1s) row in D wrong?

Regards,
Satyam


On Tue, Mar 16, 2021 at 5:54 AM Timo Walther  wrote:

> Hi Satyam,
>
> first of all your initial join query can also work, you just need to
> make sure that no time attribute is in the SELECT clause. As the
> exception indicates, you need to cast all time attributes to TIMESTAMP.
> The reason for this is some major design issue that is also explained
> here where a time attribute must not be in the output of a regular join:
>
> https://stackoverflow.com/a/64500296/806430
>
> However, since you would like to perform the join "time-based" either
> interval join or temporal join might solve your use cases.
>
> In your case I guess the watermark strategy of D is the problem. Are you
> sure the result is:
>
>  > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
>  > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
>  > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
>  > Emit D watermark=0
>
> and not:
>
>  > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
>  > Emit D watermark=0
>  > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
>  > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
>
> Or maybe the watermark is even dropped. Could you try to use a watermark
> strategy with
>
> `R` - INTERVAL '0.001' SECONDS
>
> I hope this helps.
>
> Regards,
> Timo
>
>
>
> On 16.03.21 04:37, Satyam Shekhar wrote:
> > Hello folks,
> >
> > I would love to hear back your feedback on this.
> >
> > Regards,
> > Satyam
> >
> > On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar  > <mailto:satyamshek...@gmail.com>> wrote:
> >
> > Hello folks,
> >
> > I am looking to enrich rows from an unbounded streaming table by
> > joining it with a bounded static table while preserving rowtime for
> > the streaming table. For example, let's consider table two tables F
> > and D, where F is unbounded and D is bounded. The schema for the two
> > tables is the following -
> >
> > F:
> >   |-- C0: BIGINT
> >   |-- C1: STRING
> >   |-- R: TIMESTAMP(3) **rowtime**
> >   |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
> >
> > D:
> >   |-- C0: BIGINT
> >   |-- C1: STRING NOT NULL
> >
> > I'd like to run the following query on this schema -
> >
> > select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
> >  from F join D ON F.C1 = D.C1
> >  group by 

Re: Time Temporal Join

2021-03-16 Thread Timo Walther

Hi Satyam,

first of all your initial join query can also work, you just need to 
make sure that no time attribute is in the SELECT clause. As the 
exception indicates, you need to cast all time attributes to TIMESTAMP. 
The reason for this is some major design issue that is also explained 
here where a time attribute must not be in the output of a regular join:


https://stackoverflow.com/a/64500296/806430

However, since you would like to perform the join "time-based" either 
interval join or temporal join might solve your use cases.


In your case I guess the watermark strategy of D is the problem. Are you 
sure the result is:


> Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
> Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
> Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
> Emit D watermark=0

and not:

> Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
> Emit D watermark=0
> Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
> Emit D row=+I(2,"2",1970-01-01T00:00)@time=0

Or maybe the watermark is even dropped. Could you try to use a watermark 
strategy with


`R` - INTERVAL '0.001' SECONDS

I hope this helps.

Regards,
Timo



On 16.03.21 04:37, Satyam Shekhar wrote:

Hello folks,

I would love to hear back your feedback on this.

Regards,
Satyam

On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar <mailto:satyamshek...@gmail.com>> wrote:


Hello folks,

I am looking to enrich rows from an unbounded streaming table by
joining it with a bounded static table while preserving rowtime for
the streaming table. For example, let's consider table two tables F
and D, where F is unbounded and D is bounded. The schema for the two
tables is the following -

F:
  |-- C0: BIGINT
  |-- C1: STRING
  |-- R: TIMESTAMP(3) **rowtime**
  |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS

D:
  |-- C0: BIGINT
  |-- C1: STRING NOT NULL

I'd like to run the following query on this schema -

select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
     from F join D ON F.C1 = D.C1
     group by D.C1, tumble(F.R, interval '1' second)

However, I run into the following error while running the above query -

"Rowtime attributes must not be in the input rows of a regular join.
As a workaround you can cast the time attributes of input tables to
TIMESTAMP before."

My understanding reading the docs is that Time Temporal Join is
meant to solve this problem. So I model table D as the following -

D:
  |-- C0: BIGINT
  |-- C1: STRING NOT NULL
  |-- R: TIMESTAMP(3)
  |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
  |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1)

With column D.R always set to 0 and modify the query as follows -

select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
     from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1
     group by D.C1, tumble(F.R, interval '1' second)

The above query runs but does not return any result. I have the
following data in D initially -
Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
Emit D watermark=0

And F streams the following rows -
Emit F row=+I(0,"0",1970-01-01T00:00)@time=0
Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000
Emit F watermark=1000

I expect that two rows in F will join with matching rows (on C1) in
D and produce some output. But I do not see anything in the output.

So I have the following questions -

1. Is time temporal join the correct tool to solve this problem?
2. What could be the reason for not getting any output rows in the
result?

Thanks,
Satyam





Re: Time Temporal Join

2021-03-15 Thread Satyam Shekhar
Hello folks,

I would love to hear back your feedback on this.

Regards,
Satyam

On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar 
wrote:

> Hello folks,
>
> I am looking to enrich rows from an unbounded streaming table by
> joining it with a bounded static table while preserving rowtime for the
> streaming table. For example, let's consider table two tables F and D,
> where F is unbounded and D is bounded. The schema for the two tables is the
> following -
>
> F:
>  |-- C0: BIGINT
>  |-- C1: STRING
>  |-- R: TIMESTAMP(3) **rowtime**
>  |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>
> D:
>  |-- C0: BIGINT
>  |-- C1: STRING NOT NULL
>
> I'd like to run the following query on this schema -
>
> select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
> from F join D ON F.C1 = D.C1
> group by D.C1, tumble(F.R, interval '1' second)
>
> However, I run into the following error while running the above query -
>
> "Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before."
>
> My understanding reading the docs is that Time Temporal Join is meant to
> solve this problem. So I model table D as the following -
>
> D:
>  |-- C0: BIGINT
>  |-- C1: STRING NOT NULL
>  |-- R: TIMESTAMP(3)
>  |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>  |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1)
>
> With column D.R always set to 0 and modify the query as follows -
>
> select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
> from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1
> group by D.C1, tumble(F.R, interval '1' second)
>
> The above query runs but does not return any result. I have the following
> data in D initially -
> Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
> Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
> Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
> Emit D watermark=0
>
> And F streams the following rows -
> Emit F row=+I(0,"0",1970-01-01T00:00)@time=0
> Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000
> Emit F watermark=1000
>
> I expect that two rows in F will join with matching rows (on C1) in D and
> produce some output. But I do not see anything in the output.
>
> So I have the following questions -
>
> 1. Is time temporal join the correct tool to solve this problem?
> 2. What could be the reason for not getting any output rows in the result?
>
> Thanks,
> Satyam
>
>


Time Temporal Join

2021-03-10 Thread Satyam Shekhar
Hello folks,

I am looking to enrich rows from an unbounded streaming table by joining it
with a bounded static table while preserving rowtime for the streaming
table. For example, let's consider table two tables F and D, where F is
unbounded and D is bounded. The schema for the two tables is the following -

F:
 |-- C0: BIGINT
 |-- C1: STRING
 |-- R: TIMESTAMP(3) **rowtime**
 |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS

D:
 |-- C0: BIGINT
 |-- C1: STRING NOT NULL

I'd like to run the following query on this schema -

select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
from F join D ON F.C1 = D.C1
group by D.C1, tumble(F.R, interval '1' second)

However, I run into the following error while running the above query -

"Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before."

My understanding reading the docs is that Time Temporal Join is meant to
solve this problem. So I model table D as the following -

D:
 |-- C0: BIGINT
 |-- C1: STRING NOT NULL
 |-- R: TIMESTAMP(3)
 |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
 |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1)

With column D.R always set to 0 and modify the query as follows -

select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1
group by D.C1, tumble(F.R, interval '1' second)

The above query runs but does not return any result. I have the following
data in D initially -
Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
Emit D watermark=0

And F streams the following rows -
Emit F row=+I(0,"0",1970-01-01T00:00)@time=0
Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000
Emit F watermark=1000

I expect that two rows in F will join with matching rows (on C1) in D and
produce some output. But I do not see anything in the output.

So I have the following questions -

1. Is time temporal join the correct tool to solve this problem?
2. What could be the reason for not getting any output rows in the result?

Thanks,
Satyam


Re: Processing-time temporal join is not supported yet.

2021-03-04 Thread Leonard Xu
Sorry, I mean you can create a UDTF where you can cache data from your files 
and then enrich your stream with LATERAL TABLE grammar,

BTW, you can reference FileSystemLookupFunction.java[1]. If we plan to support 
Lookup for filesystem, we should use this function too.

Best,
Leonard

[1] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java

 

> 在 2021年3月4日,19:26,eric hoffmann  写道:
> 
> Thx Leonard,
> by UDF you mean a custom table source on s3?
> 
> Le jeu. 4 mars 2021 à 05:31, Leonard Xu  <mailto:xbjt...@gmail.com>> a écrit :
> Hi, Eric
> 
>> what will be the best workaround to enrich stream of data from a kafka 
>> topics with statical data based on id?
> Currently you can put your statical data in Hive/JDBC/HBase which supports 
> lookup the data in full table env as a workaround,.
> You can also write a UDF which caches the s3 files that can be used to enrich 
> your stream data.
> 
> Best,
> Leonard
> 
>> 
>> 
>> Le sam. 27 févr. 2021 à 05:15, Leonard Xu > <mailto:xbjt...@gmail.com>> a écrit :
>> Hi, Eric
>> 
>> Firstly FileSystemTableSource doe not implement LookupTableSource which 
>> means we cannot directly lookup a Filesystem table.
>> 
>> In FLINK-19830, we plan to support Processing-time temporal join any 
>> table/views by lookup the data in join operator state which scanned from the 
>> filesystem table, but as the issue described: join processing for left 
>> stream doesn't wait for the complete snapshot of temporal table, this may 
>> mislead users in production environment.  
>> Eg: your s3 table has 1000 records, but the join operator does not know when 
>> all records has been arrived, the correlation maybe incorrect, thus we 
>> disable this feature.
>> 
>> I think we can  implement LookupTableSource for  FileSystemTableSource 
>> currently, after that, we can directly lookup a Filesystem table, the 
>> implementation will be similar to Hive table where we cache all data of the 
>> files and then lookup the cache.  Could you help create an JIRA ticket for 
>> this?
>> 
>> 
>> Best,
>> Leonard 
>> 
>> 
>>> 在 2021年2月26日,23:41,Matthias Pohl >> <mailto:matth...@ververica.com>> 写道:
>>> 
>>> Hi Eric,
>>> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the 
>>> thread. Maybe, he has a workaround for your case.
>>> 
>>> Best,
>>> Matthias
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-19830 
>>> <https://issues.apache.org/jira/browse/FLINK-19830>
>>> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann >> <mailto:sigfrid.hoffm...@gmail.com>> wrote:
>>> Hello
>>> Working with flink 1.12.1 i read in the doc that Processing-time temporal 
>>> join is supported for kv like join but when i try i get a:
>>> 
>>> Exception in thread "main" org.apache.flink.table.api.TableException: 
>>> Processing-time temporal join is not supported yet.
>>> at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>>> at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>>> at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>>> at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>>> at 
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>> at 
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>> at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>>> at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>>> at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>>> at 
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.sc

Re: Processing-time temporal join is not supported yet.

2021-03-04 Thread eric hoffmann
Thx Leonard,
by UDF you mean a custom table source on s3?

Le jeu. 4 mars 2021 à 05:31, Leonard Xu  a écrit :

> Hi, Eric
>
> what will be the best workaround to enrich stream of data from a kafka
> topics with statical data based on id?
>
> Currently you can put your statical data in Hive/JDBC/HBase which supports
> lookup the data in full table env as a workaround,.
> You can also write a UDF which caches the s3 files that can be used to
> enrich your stream data.
>
> Best,
> Leonard
>
>
>
> Le sam. 27 févr. 2021 à 05:15, Leonard Xu  a écrit :
>
>> Hi, Eric
>>
>> Firstly FileSystemTableSource doe not implement LookupTableSource which
>> means we cannot directly lookup a Filesystem table.
>>
>> In FLINK-19830, we plan to support Processing-time temporal join any
>> table/views by lookup the data in join operator state which scanned from
>> the filesystem table, but as the issue described: join processing for
>> left stream doesn't wait for the complete snapshot of temporal table, this
>> may mislead users in production environment.
>> Eg: your s3 table has 1000 records, but the join operator does not know
>> when all records has been arrived, the correlation maybe incorrect, thus we
>> disable this feature.
>>
>> I think we can  implement LookupTableSource for  FileSystemTableSource
>> currently, after that, we can directly lookup a Filesystem table, the
>> implementation will be similar to Hive table where we cache all data of the
>> files and then lookup the cache.  Could you help create an JIRA ticket for
>> this?
>>
>>
>> Best,
>> Leonard
>>
>>
>> 在 2021年2月26日,23:41,Matthias Pohl  写道:
>>
>> Hi Eric,
>> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the
>> thread. Maybe, he has a workaround for your case.
>>
>> Best,
>> Matthias
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-19830
>>
>> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <
>> sigfrid.hoffm...@gmail.com> wrote:
>>
>>> Hello
>>> Working with flink 1.12.1 i read in the doc that Processing-time
>>> temporal join is supported for kv like join but when i try i get a:
>>>
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Processing-time temporal join is not supported yet.
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>> at
>>> org.apache.flink.table.planner.plan.no

Re: Processing-time temporal join is not supported yet.

2021-03-03 Thread Leonard Xu
Hi, Eric

> what will be the best workaround to enrich stream of data from a kafka topics 
> with statical data based on id?
Currently you can put your statical data in Hive/JDBC/HBase which supports 
lookup the data in full table env as a workaround,.
You can also write a UDF which caches the s3 files that can be used to enrich 
your stream data.

Best,
Leonard

> 
> 
> Le sam. 27 févr. 2021 à 05:15, Leonard Xu  <mailto:xbjt...@gmail.com>> a écrit :
> Hi, Eric
> 
> Firstly FileSystemTableSource doe not implement LookupTableSource which means 
> we cannot directly lookup a Filesystem table.
> 
> In FLINK-19830, we plan to support Processing-time temporal join any 
> table/views by lookup the data in join operator state which scanned from the 
> filesystem table, but as the issue described: join processing for left stream 
> doesn't wait for the complete snapshot of temporal table, this may mislead 
> users in production environment.  
> Eg: your s3 table has 1000 records, but the join operator does not know when 
> all records has been arrived, the correlation maybe incorrect, thus we 
> disable this feature.
> 
> I think we can  implement LookupTableSource for  FileSystemTableSource 
> currently, after that, we can directly lookup a Filesystem table, the 
> implementation will be similar to Hive table where we cache all data of the 
> files and then lookup the cache.  Could you help create an JIRA ticket for 
> this?
> 
> 
> Best,
> Leonard 
> 
> 
>> 在 2021年2月26日,23:41,Matthias Pohl > <mailto:matth...@ververica.com>> 写道:
>> 
>> Hi Eric,
>> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the 
>> thread. Maybe, he has a workaround for your case.
>> 
>> Best,
>> Matthias
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-19830 
>> <https://issues.apache.org/jira/browse/FLINK-19830>
>> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann > <mailto:sigfrid.hoffm...@gmail.com>> wrote:
>> Hello
>> Working with flink 1.12.1 i read in the doc that Processing-time temporal 
>> join is supported for kv like join but when i try i get a:
>> 
>> Exception in thread "main" org.apache.flink.table.api.TableException: 
>> Processing-time temporal join is not supported yet.
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>> at 
>> org.

Re: Processing-time temporal join is not supported yet.

2021-03-03 Thread eric hoffmann
Hi Leonard,
Thx for your reply,
Not problem to help on the JIRA topic,
In my situation, in a full sql env, what will be the best workaround to
enrich stream of data from a kafka topics with statical data based on id?
i know how to do t in stream.
eric

Le sam. 27 févr. 2021 à 05:15, Leonard Xu  a écrit :

> Hi, Eric
>
> Firstly FileSystemTableSource doe not implement LookupTableSource which
> means we cannot directly lookup a Filesystem table.
>
> In FLINK-19830, we plan to support Processing-time temporal join any
> table/views by lookup the data in join operator state which scanned from
> the filesystem table, but as the issue described: join processing for
> left stream doesn't wait for the complete snapshot of temporal table, this
> may mislead users in production environment.
> Eg: your s3 table has 1000 records, but the join operator does not know
> when all records has been arrived, the correlation maybe incorrect, thus we
> disable this feature.
>
> I think we can  implement LookupTableSource for  FileSystemTableSource
> currently, after that, we can directly lookup a Filesystem table, the
> implementation will be similar to Hive table where we cache all data of the
> files and then lookup the cache.  Could you help create an JIRA ticket for
> this?
>
>
> Best,
> Leonard
>
>
> 在 2021年2月26日,23:41,Matthias Pohl  写道:
>
> Hi Eric,
> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the
> thread. Maybe, he has a workaround for your case.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-19830
>
> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann 
> wrote:
>
>> Hello
>> Working with flink 1.12.1 i read in the doc that Processing-time temporal
>> join is supported for kv like join but when i try i get a:
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Processing-time temporal join is not supported yet.
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>>
>> my query:
>>
>> SELECT e.id
>> , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime 
>> AS r ON
>> e.id = r.id
>>
>> my s3 table:
>>
>> CREATE TABLE s3Table(id STRING,
>> test STRING, PRIMARY KEY (id) NOT ENFORCED)
>>   WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')
>>
>> my kafka table:
>>
>> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT,
>> proctime AS PROCTIME())
>>
>>   WITH 
>> ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='
>> 127.0.0.1:9092','properties.group.id
>> '='mygroup','format'='json','scan.startup.mode'='group-offsets', 
>> 'properties.enable.auto.commit'='false')
>>
>>
>
>


Re: Processing-time temporal join is not supported yet.

2021-02-26 Thread Leonard Xu
Hi, Eric

Firstly FileSystemTableSource doe not implement LookupTableSource which means 
we cannot directly lookup a Filesystem table.

In FLINK-19830, we plan to support Processing-time temporal join any 
table/views by lookup the data in join operator state which scanned from the 
filesystem table, but as the issue described: join processing for left stream 
doesn't wait for the complete snapshot of temporal table, this may mislead 
users in production environment.  
Eg: your s3 table has 1000 records, but the join operator does not know when 
all records has been arrived, the correlation maybe incorrect, thus we disable 
this feature.

I think we can  implement LookupTableSource for  FileSystemTableSource 
currently, after that, we can directly lookup a Filesystem table, the 
implementation will be similar to Hive table where we cache all data of the 
files and then lookup the cache.  Could you help create an JIRA ticket for this?


Best,
Leonard 


> 在 2021年2月26日,23:41,Matthias Pohl  写道:
> 
> Hi Eric,
> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the 
> thread. Maybe, he has a workaround for your case.
> 
> Best,
> Matthias
> 
> [1] https://issues.apache.org/jira/browse/FLINK-19830 
> <https://issues.apache.org/jira/browse/FLINK-19830>
> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann  <mailto:sigfrid.hoffm...@gmail.com>> wrote:
> Hello
> Working with flink 1.12.1 i read in the doc that Processing-time temporal 
> join is supported for kv like join but when i try i get a:
> 
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> Processing-time temporal join is not supported yet.
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
> at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
> 
> my query:
> 
> SELECT e.id <http://e.id/>, r.test FROM kafkaTable as e JOIN s3Table FOR 
> SYSTEM_TIME AS OF e.proctime AS r ON e.id <http://e.id/> = r.id <http://r.id/>
> 
> my s3 table:
> 
> CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
>   WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')
> 
> my kafka table:
> 
> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS 
> PROCTIME())
>   WITH 
> ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092
>  <http://127.0.0.1:9092/>','properties.group.id 
> <http://properties.group.id/>'='mygroup','format'='json','scan.startup.mode'='group-offsets',
>  'properties.enable.auto.commit'='false')
> 
> 



Re: Processing-time temporal join is not supported yet.

2021-02-26 Thread Matthias Pohl
Hi Eric,
it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the
thread. Maybe, he has a workaround for your case.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-19830

On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann 
wrote:

> Hello
> Working with flink 1.12.1 i read in the doc that Processing-time temporal
> join is supported for kv like join but when i try i get a:
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Processing-time temporal join is not supported yet.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>
> my query:
>
> SELECT e.id
> , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime 
> AS r ON
> e.id = r.id
>
> my s3 table:
>
> CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
>   WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')
>
> my kafka table:
>
> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT,
> proctime AS PROCTIME())
>
>   WITH 
> ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='
> 127.0.0.1:9092','properties.group.id
> '='mygroup','format'='json','scan.startup.mode'='group-offsets', 
> 'properties.enable.auto.commit'='false')
>
>


Processing-time temporal join is not supported yet.

2021-02-26 Thread eric hoffmann
Hello
Working with flink 1.12.1 i read in the doc that Processing-time temporal
join is supported for kv like join but when i try i get a:

Exception in thread "main" org.apache.flink.table.api.TableException:
Processing-time temporal join is not supported yet.
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
at
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)

my query:

SELECT e.id
, r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF
e.proctime AS r ON
e.id = r.id

my s3 table:

CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
  WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')

my kafka table:

CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT,
proctime AS PROCTIME())
  WITH 
('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='
127.0.0.1:9092','properties.group.id
'='mygroup','format'='json','scan.startup.mode'='group-offsets',
'properties.enable.auto.commit'='false')