Sorry, I mean you can create a UDTF where you can cache data from your files 
and then enrich your stream with LATERAL TABLE grammar,

BTW, you can reference FileSystemLookupFunction.java[1]. If we plan to support 
Lookup for filesystem, we should use this function too.

Best,
Leonard

[1] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java

 

> 在 2021年3月4日,19:26,eric hoffmann <sigfrid.hoffm...@gmail.com> 写道:
> 
> Thx Leonard,
> by UDF you mean a custom table source on s3?
> 
> Le jeu. 4 mars 2021 à 05:31, Leonard Xu <xbjt...@gmail.com 
> <mailto:xbjt...@gmail.com>> a écrit :
> Hi, Eric
> 
>> what will be the best workaround to enrich stream of data from a kafka 
>> topics with statical data based on id?
> Currently you can put your statical data in Hive/JDBC/HBase which supports 
> lookup the data in full table env as a workaround,.
> You can also write a UDF which caches the s3 files that can be used to enrich 
> your stream data.
> 
> Best,
> Leonard
> 
>> 
>> 
>> Le sam. 27 févr. 2021 à 05:15, Leonard Xu <xbjt...@gmail.com 
>> <mailto:xbjt...@gmail.com>> a écrit :
>> Hi, Eric
>> 
>> Firstly FileSystemTableSource doe not implement LookupTableSource which 
>> means we cannot directly lookup a Filesystem table.
>> 
>> In FLINK-19830, we plan to support Processing-time temporal join any 
>> table/views by lookup the data in join operator state which scanned from the 
>> filesystem table, but as the issue described: join processing for left 
>> stream doesn't wait for the complete snapshot of temporal table, this may 
>> mislead users in production environment.  
>> Eg: your s3 table has 1000 records, but the join operator does not know when 
>> all records has been arrived, the correlation maybe incorrect, thus we 
>> disable this feature.
>> 
>> I think we can  implement LookupTableSource for  FileSystemTableSource 
>> currently, after that, we can directly lookup a Filesystem table, the 
>> implementation will be similar to Hive table where we cache all data of the 
>> files and then lookup the cache.  Could you help create an JIRA ticket for 
>> this?
>> 
>> 
>> Best,
>> Leonard 
>> 
>> 
>>> 在 2021年2月26日,23:41,Matthias Pohl <matth...@ververica.com 
>>> <mailto:matth...@ververica.com>> 写道:
>>> 
>>> Hi Eric,
>>> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the 
>>> thread. Maybe, he has a workaround for your case.
>>> 
>>> Best,
>>> Matthias
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-19830 
>>> <https://issues.apache.org/jira/browse/FLINK-19830>
>>> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <sigfrid.hoffm...@gmail.com 
>>> <mailto:sigfrid.hoffm...@gmail.com>> wrote:
>>> Hello
>>> Working with flink 1.12.1 i read in the doc that Processing-time temporal 
>>> join is supported for kv like join but when i try i get a:
>>> 
>>> Exception in thread "main" org.apache.flink.table.api.TableException: 
>>> Processing-time temporal join is not supported yet.
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>>         at 
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>>>         at 
>>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>>> 
>>> my query:
>>> 
>>> SELECT e.id <http://e.id/>, r.test FROM kafkaTable as e JOIN s3Table FOR 
>>> SYSTEM_TIME AS OF e.proctime AS r ON e.id <http://e.id/> = r.id 
>>> <http://r.id/>
>>> 
>>> my s3 table:
>>> 
>>> CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
>>>       WITH ('connector'='filesystem','path'='s3a://fs/','format'='json <>')
>>> 
>>> my kafka table:
>>> 
>>> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS 
>>> PROCTIME())
>>>       WITH 
>>> ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092
>>>  <http://127.0.0.1:9092/>','properties.group.id 
>>> <http://properties.group.id/>'='mygroup','format'='json','scan.startup.mode'='group-offsets',
>>>  'properties.enable.auto.commit'='false')
>>> 
>>> 
>> 
> 

Reply via email to