Thx Leonard,
by UDF you mean a custom table source on s3?

Le jeu. 4 mars 2021 à 05:31, Leonard Xu <xbjt...@gmail.com> a écrit :

> Hi, Eric
>
> what will be the best workaround to enrich stream of data from a kafka
> topics with statical data based on id?
>
> Currently you can put your statical data in Hive/JDBC/HBase which supports
> lookup the data in full table env as a workaround,.
> You can also write a UDF which caches the s3 files that can be used to
> enrich your stream data.
>
> Best,
> Leonard
>
>
>
> Le sam. 27 févr. 2021 à 05:15, Leonard Xu <xbjt...@gmail.com> a écrit :
>
>> Hi, Eric
>>
>> Firstly FileSystemTableSource doe not implement LookupTableSource which
>> means we cannot directly lookup a Filesystem table.
>>
>> In FLINK-19830, we plan to support Processing-time temporal join any
>> table/views by lookup the data in join operator state which scanned from
>> the filesystem table, but as the issue described: join processing for
>> left stream doesn't wait for the complete snapshot of temporal table, this
>> may mislead users in production environment.
>> Eg: your s3 table has 1000 records, but the join operator does not know
>> when all records has been arrived, the correlation maybe incorrect, thus we
>> disable this feature.
>>
>> I think we can  implement LookupTableSource for  FileSystemTableSource
>> currently, after that, we can directly lookup a Filesystem table, the
>> implementation will be similar to Hive table where we cache all data of the
>> files and then lookup the cache.  Could you help create an JIRA ticket for
>> this?
>>
>>
>> Best,
>> Leonard
>>
>>
>> 在 2021年2月26日,23:41,Matthias Pohl <matth...@ververica.com> 写道:
>>
>> Hi Eric,
>> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the
>> thread. Maybe, he has a workaround for your case.
>>
>> Best,
>> Matthias
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-19830
>>
>> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <
>> sigfrid.hoffm...@gmail.com> wrote:
>>
>>> Hello
>>> Working with flink 1.12.1 i read in the doc that Processing-time
>>> temporal join is supported for kv like join but when i try i get a:
>>>
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Processing-time temporal join is not supported yet.
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>>>         at
>>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>>>
>>> my query:
>>>
>>> SELECT e.id
>>> , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime 
>>> AS r ON
>>> e.id = r.id
>>>
>>> my s3 table:
>>>
>>> CREATE TABLE s3Table(id STRING,
>>> test STRING, PRIMARY KEY (id) NOT ENFORCED)
>>>       WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')
>>>
>>> my kafka table:
>>>
>>> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT,
>>> proctime AS PROCTIME())
>>>
>>>       WITH 
>>> ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='
>>> 127.0.0.1:9092','properties.group.id
>>> '='mygroup','format'='json','scan.startup.mode'='group-offsets', 
>>> 'properties.enable.auto.commit'='false')
>>>
>>>
>>
>>
>

Reply via email to