Hi, Eric

Firstly FileSystemTableSource doe not implement LookupTableSource which means 
we cannot directly lookup a Filesystem table.

In FLINK-19830, we plan to support Processing-time temporal join any 
table/views by lookup the data in join operator state which scanned from the 
filesystem table, but as the issue described: join processing for left stream 
doesn't wait for the complete snapshot of temporal table, this may mislead 
users in production environment.  
Eg: your s3 table has 1000 records, but the join operator does not know when 
all records has been arrived, the correlation maybe incorrect, thus we disable 
this feature.

I think we can  implement LookupTableSource for  FileSystemTableSource 
currently, after that, we can directly lookup a Filesystem table, the 
implementation will be similar to Hive table where we cache all data of the 
files and then lookup the cache.  Could you help create an JIRA ticket for this?


Best,
Leonard 


> 在 2021年2月26日,23:41,Matthias Pohl <matth...@ververica.com> 写道:
> 
> Hi Eric,
> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the 
> thread. Maybe, he has a workaround for your case.
> 
> Best,
> Matthias
> 
> [1] https://issues.apache.org/jira/browse/FLINK-19830 
> <https://issues.apache.org/jira/browse/FLINK-19830>
> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <sigfrid.hoffm...@gmail.com 
> <mailto:sigfrid.hoffm...@gmail.com>> wrote:
> Hello
> Working with flink 1.12.1 i read in the doc that Processing-time temporal 
> join is supported for kv like join but when i try i get a:
> 
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> Processing-time temporal join is not supported yet.
>         at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>         at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>         at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>         at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>         at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>         at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>         at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>         at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>         at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>         at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>         at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>         at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>         at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>         at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>         at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>         at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>         at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>         at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
> 
> my query:
> 
> SELECT e.id <http://e.id/>, r.test FROM kafkaTable as e JOIN s3Table FOR 
> SYSTEM_TIME AS OF e.proctime AS r ON e.id <http://e.id/> = r.id <http://r.id/>
> 
> my s3 table:
> 
> CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
>       WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')
> 
> my kafka table:
> 
> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS 
> PROCTIME())
>       WITH 
> ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092
>  <http://127.0.0.1:9092/>','properties.group.id 
> <http://properties.group.id/>'='mygroup','format'='json','scan.startup.mode'='group-offsets',
>  'properties.enable.auto.commit'='false')
> 
> 

Reply via email to