是的,应该是AS PROCTIME(),是我的笔误。但是同样的错误还会报。
________________________________
发件人: Benchao Li <libenc...@gmail.com>
发送时间: 2020年5月18日 12:59
收件人: user-zh <user-zh@flink.apache.org>
主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR 
SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

你第二次贴的DDL好像也有些问题,是不是`proctime AS PROCTIME(),`?

wind.fly....@outlook.com <wind.fly....@outlook.com> 于2020年5月18日周一 上午9:48写道:

> Sorry, 之前建表语句copy错了,应该是这样:
>         CREATE TABLE x.log.yanfa_log (
>     dt TIMESTAMP(3),
>     conn_id STRING,
>     sequence STRING,
>     trace_id STRING,
>     span_info STRING,
>     service_id STRING,
>     msg_id STRING,
>     servicename STRING,
>     ret_code STRING,
>     duration STRING,
>     req_body MAP<String,String>,
>     res_body MAP<STRING,STRING>,
>     extra_info MAP<STRING,STRING>,
>     proctime PROCTIME(),
>     WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
> ) WITH (
>     'connector.type' = 'kafka',
>     'connector.version' = '0.11',
>     'connector.topic' = 'x-log-yanfa_log',
>     'connector.properties.bootstrap.servers' = '******:9092',
>     'connector.properties.zookeeper.connect' = '******:2181',
>     'connector.startup-mode' = 'latest-offset',
>     'update-mode' = 'append',
>     'format.type' = 'json',
>     'format.fail-on-missing-field' = 'true'
> );
> 报同样的错误
>
> -----邮件原件-----
> 发件人: wind.fly....@outlook.com <wind.fly....@outlook.com>
> 发送时间: 2020年5月18日 9:45
> 收件人: user-zh@flink.apache.org
> 主题: flink sql使用维表关联时报Temporal table join currently only supports 'FOR
> SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
>
> Hi, all:
>        本人使用的flink 版本为1.10.0,planner为BlinkPlanner,用LEFT JOIN FOR
> SYSTEM_TIME AS OF 语法关联维表:
>        select TUMBLE_END(l.dt, INTERVAL '30' SECOND) as index_time,
> l.extra_info['cityCode'] as city_code, v.vehicle_level as vehicle_level,
> CAST(COUNT(DISTINCT req_body['driverId']) as STRING) as index_value from
> x.log.yanfa_log AS l LEFT JOIN x.saic_auth_user.t_driver FOR SYSTEM_TIME AS
> OF l.proctime AS d ON l.req_body['driverId'] = d.uid LEFT JOIN
> x.saic_cms_config.t_vehicle FOR SYSTEM_TIME AS OF l.proctime AS v ON
> d.vin=v.vehicle_vin where l.ret_code = '0' and l.servicename =
> 'MatchGtw.uploadLocationV4' and l.req_body['appId'] = 'saic_card' GROUP BY
> TUMBLE(l.dt, INTERVAL '30' SECOND), l.extra_info['cityCode'],
> v.vehicle_level;
>        建表语句用了computed columns:
>        CREATE TABLE x.log.yanfa_log (
>     dt TIMESTAMP(3),
>     conn_id STRING,
>     sequence STRING,
>     trace_id STRING,
>     span_info STRING,
>     service_id STRING,
>     msg_id STRING,
>     servicename STRING,
>     ret_code STRING,
>     duration STRING,
>     req_body MAP<String,String>,
>     res_body MAP<STRING,STRING>,
>     extra_info MAP<STRING,STRING>,
>     proctime TIMESTAMP(3),
>     WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
> ) WITH (
>     'connector.type' = 'kafka',
>     'connector.version' = '0.11',
>     'connector.topic' = 'x-log-yanfa_log',
>     'connector.properties.bootstrap.servers' = '******:9092',
>     'connector.properties.zookeeper.connect' = '******:2181',
>     'connector.startup-mode' = 'latest-offset',
>     'update-mode' = 'append',
>     'format.type' = 'json',
>     'format.fail-on-missing-field' = 'true'
> );
> 报如下异常:
>        Caused by: org.apache.flink.table.api.TableException: Temporal
> table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's
> proctime field, doesn't support 'PROCTIME()'
>        at
> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>        at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:118)
>        at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:131)
>        at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>        at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>        at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>        at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>        at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>        at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>        at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>        at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>        at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>        at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>        at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>        at
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>        at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>        at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>        ... 20 more
>
>
>

--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn

Reply via email to