Hi Eric,
it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the
thread. Maybe, he has a workaround for your case.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-19830

On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <sigfrid.hoffm...@gmail.com>
wrote:

> Hello
> Working with flink 1.12.1 i read in the doc that Processing-time temporal
> join is supported for kv like join but when i try i get a:
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Processing-time temporal join is not supported yet.
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>         at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>
> my query:
>
> SELECT e.id
> , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime 
> AS r ON
> e.id = r.id
>
> my s3 table:
>
> CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
>       WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')
>
> my kafka table:
>
> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT,
> proctime AS PROCTIME())
>
>       WITH 
> ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='
> 127.0.0.1:9092','properties.group.id
> '='mygroup','format'='json','scan.startup.mode'='group-offsets', 
> 'properties.enable.auto.commit'='false')
>
>

Reply via email to