转发: flink创建视图后,SELECT语句后使用OPTIONS报错

2022-02-16 文章 liangjinghong
感谢老师的回复,然而我的部署环境下的lib中没有您说的这个包,请问是要移除哪个包呢?

我的lib下有的包:
flink-csv-1.13.0.jar
flink-dist_2.11-1.13.0.jar
flink-json-1.13.0.jar
flink-shaded-zookeeper-3.4.14.jar
flink-sql-connector-mysql-cdc-2.1.1.jar
flink-table_2.11-1.13.0.jar
flink-table-blink_2.11-1.13.0.jar
log4j-1.2-api-2.12.1.jar
log4j-api-2.12.1.jar
log4j-core-2.12.1.jar
log4j-slf4j-impl-2.12.1.jar
-邮件原件-
发件人: godfrey he [mailto:godfre...@gmail.com] 
发送时间: 2022年2月16日 16:47
收件人: user-zh 
主题: Re: flink创建视图后,SELECT语句后使用OPTIONS报错

Hi liangjinghong,

原因是 blink planner 中引入并修改了 SqlTableRef 类, 而 Legacy planner 中没有引入 SqlTableRef
类,从而导致加载到了Calcite 中 SqlTableRef (该类有问题)。
解决方案:如果只使用到了blink planner,可以把legacy planner 的包冲lib下移除。

Best,
Godfrey

liangjinghong  于2022年2月14日周一 17:26写道:

> 各位老师们好,以下代码在开发环境中可以执行,打包部署后报错:
>
> 代码:
>
> CREATE VIEW used_num_common
>
> (toolName,region,type,flavor,used_num)
>
> AS
>
> select info.toolName as toolName,r.regionName as
> region,f.type,f.flavor,count(1) as used_num from
>
> tbl_schedule_job/*+ OPTIONS('server-id'='1001-1031') */ job
>
> join
>
> tbl_schedule_task/*+ OPTIONS('server-id'='2001-2031') */ task
>
> on job.jobId = task.jobId
>
> join
>
> tbl_broker_node/*+ OPTIONS('server-id'='3001-3031') */  node
>
> on task.nodeId = node.id
>
> join
>
> tbl_app_info/*+ OPTIONS('server-id'='4001-4031') */ info
>
> on job.appId = info.appId
>
> join
>
> tbl_region r
>
> on node.region/*+ OPTIONS('server-id'='5001-5031') */ = r.region
>
> join
>
> tbl_flavor/*+ OPTIONS('server-id'='6001-6031') */  f
>
> on node.resourcesSpec = f.flavor
>
> where job.jobStatus in ('RUNNING','ERROR','INITING')
>
> and task.taskStatus in ('RUNNING','ERROR','INITING')
>
> and node.machineStatus <> 'DELETED'
>
> and toolName is not null
>
> group by info.toolName,r.regionName,f.type,f.flavor
>
> …
>
> 打包部署后报错如下:
>
> The main method caused an error: class org.apache.calcite.sql.SqlSyntax$6:
> SPECIAL
>
> 2022-02-08 13:33:39,350 WARN
> org.apache.flink.client.deployment.application.DetachedApplicationRunn
> er []
> - Could not execute application:
>
> org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error: class org.apache.calcite.sql.SqlSyntax$6: 
> SPECIAL
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(Package
> dProgram.java:372)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeF
> orExecution(PackagedProgram.java:222)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:11
> 4)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunn
> er.tryExecuteJobs(DetachedApplicationRunner.java:84)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunn
> er.run(DetachedApplicationRunner.java:70)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$hand
> leRequest$0(JarRunHandler.java:102)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFutu
> re.java:1700)
> [?:?]
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515
> )
> [?:?]
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.r
> un(ScheduledThreadPoolExecutor.java:304)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.j
> ava:1128)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.
> java:628)
> [?:?]
>
> at java.lang.Thread.run(Thread.java:829) [?:?]
>
> Caused by: java.lang.UnsupportedOperationException: class
> org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>
> at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at 
> org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:329)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at 
> org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:101)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache

flink创建视图后,SELECT语句后使用OPTIONS报错

2022-02-14 文章 liangjinghong
各位老师们好,以下代码在开发环境中可以执行,打包部署后报错:
代码:
CREATE VIEW used_num_common
(toolName,region,type,flavor,used_num)
AS
select info.toolName as toolName,r.regionName as 
region,f.type,f.flavor,count(1) as used_num from
tbl_schedule_job/*+ OPTIONS('server-id'='1001-1031') */ job
join
tbl_schedule_task/*+ OPTIONS('server-id'='2001-2031') */ task
on job.jobId = task.jobId
join
tbl_broker_node/*+ OPTIONS('server-id'='3001-3031') */  node
on task.nodeId = node.id
join
tbl_app_info/*+ OPTIONS('server-id'='4001-4031') */ info
on job.appId = info.appId
join
tbl_region r
on node.region/*+ OPTIONS('server-id'='5001-5031') */ = r.region
join
tbl_flavor/*+ OPTIONS('server-id'='6001-6031') */  f
on node.resourcesSpec = f.flavor
where job.jobStatus in ('RUNNING','ERROR','INITING')
and task.taskStatus in ('RUNNING','ERROR','INITING')
and node.machineStatus <> 'DELETED'
and toolName is not null
group by info.toolName,r.regionName,f.type,f.flavor
…
打包部署后报错如下:
The main method caused an error: class org.apache.calcite.sql.SqlSyntax$6: 
SPECIAL
2022-02-08 13:33:39,350 WARN 
org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
Could not execute application:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: class org.apache.calcite.sql.SqlSyntax$6: SPECIAL
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.11-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
 [?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.UnsupportedOperationException: class 
org.apache.calcite.sql.SqlSyntax$6: SPECIAL
at org.apache.calcite.util.Util.needToImplement(Util.java:1075) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:329) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:101) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at 
org.apache.calcite.sql.SqlSelectOperator.unparse(SqlSelectOperator.java:176) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at 

请教设置了table.exec.state.ttl后,结果数据不更新问题

2022-02-11 文章 liangjinghong
你好,我是一个flink新手。为了进行状态管理,我在代码中设置了configuration.setString("table.exec.state.ttl","12h");

然而,在flink作业运行12小时后,我的update结果表再也没有更新过。从web ui可以看到,我的源头与一些算子的Records 
Sent一直在增长,任务也没有出现任何异常,checkpoint也正常,所以我很苦恼是哪里出现了问题
[cid:image005.jpg@01D8201E.4C9301C0]
[cid:image006.jpg@01D8201E.4C9301C0]
以下是我的SQL语句:
CREATE TABLE `tbl_rpt_app_usage`
 (`datepoint` TIMESTAMP,
  `appId` VARCHAR ,
  `flavor` VARCHAR ,
  `applyNum` BIGINT ,
  `releaseNum` BIGINT ,
  `usedNum` BIGINT ,
   PRIMARY KEY (`datepoint`) NOT ENFORCED
 )WITH(
 'connector' = 'mysql-cdc',
 'hostname' = '',
 'port' = '3306',
 'username' = '',
 'password' = '',
 'database-name' = '',
 'table-name' = ''
 );
CREATE TABLE `temporary_usage`
 (`datepoint` TIMESTAMP,
  `appId` VARCHAR ,
  `flavor` VARCHAR ,
  `applyNum` BIGINT ,
  `releaseNum` BIGINT ,
  `usedNum` BIGINT ,
   PRIMARY KEY (`datepoint`) NOT ENFORCED
 )WITH(
 'connector' = 'mysql-cdc',
 'hostname' = '',
 'port' = '',
 'username' = '',
 'password' = '',
 'database-name' = '',
 'table-name' = ''
 );
CREATE TABLE `tbl_cce_cluster`
 (`clusterId` VARCHAR ,
  `region` VARCHAR ,
  `flavor` VARCHAR ,
  `totalNode` BIGINT ,
  `toolName` VARCHAR ,
  `appId` VARCHAR ,
   PRIMARY KEY (`clusterId`) NOT ENFORCED
 )WITH(
 'connector' = 'mysql-cdc',
 'hostname' = '',
 'port' = '3306',
 'username' = '',
 'password' = '',
 'database-name' = '',
 'table-name' = ''
 );
---mysql落地表
CREATE table sink(
  code STRING,
  name STRING,
  usedcnt BIGINT,
  `time` TIMESTAMP,
  type STRING,
  PRIMARY KEY (`time`) NOT ENFORCED
) with (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql: ',
   'username' = '',
   'password' = '',
   'table-name' = 'sink'
);
(因为MySQLCDC似乎不支持在同步时筛选指定内容,所以目前同步后先创建筛选结果的虚拟表,再进行计算,还想请教下是否有更好的办法)
CREATE VIEW rpt
 (datepoint,appId,flavor,applyNum,releaseNum,usedNum)
 AS
 select * from tbl_rpt_app_usage
 where datepoint > '2022-02-11 14:54:01'
 and appId not in ('aaa')
 union all
 select * from temporary_usage

CREATE VIEW cce
(clusterId,region,flavor,totalNode,toolName,appId)
AS
select * from tbl_cce_cluster
where toolName in ('bbb','ccc')

---代码逻辑如下:
insert into sink
select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from(
select
info.code,
info.name,
sum(rpt.usedNum) as usedcnt,'online' type
from (
select * from (
select *,ROW_NUMBER() OVER(PARTITION by appId,flavor order by datepoint desc) 
as row_num from rpt/*+ OPTIONS('server-id'='1001-1005') */
)where row_num =1
)rpt
join info
on info.appid=rpt.appId
group by info.code,info.name)
union all
select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from(
select
info.code,
info.name,
sum(totalNode) as usedcnt,'online' type
from
cce/*+ OPTIONS('server-id'='1006-1010') */
join info
on cce.appId=info.appid
group by info.code,info.name)
我的结果表在[cid:image007.png@01D8201D.D86E3030]的12小时(我设置的过期时间)后,再也没更新过。
[cid:image008.png@01D8201D.D86E3030]

另外,还想请教一个问题:目前group 
by算子的落地到数据库的结果只支持update吗?业务期待获取历史数据,目前只能想到每分钟全量同步一次结果表的数据到另一个表里,这样就可以追踪到历史数据,是否还有更好的解决办法呢?


非常感谢您的阅读与解惑!


请教三张以上时态表关联,加where条件后报错 mismatched type $6 TIMESTAMP(3)的问题

2022-02-05 文章 liangjinghong
你好,因业务需求,flink 1.13,MySQL CDC 
2.1.1下需要将三张满足时态表结构的表关联,在没有对关联结果加where条件时,可以正常运行,加了where条件后,报错如下:
SQL:
insert into sink
select count(1) as machine from tbl_schedule_job as job
join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t
on t.jobId = job.jobId FOR SYSTEM_TIME AS OF t.lastModifiedTime AS n
where job.jobStatus in ('RUNNING','INITING','ERROR')
报错:
Exception in thread "main" java.lang.AssertionError: mismatched type $6 
TIMESTAMP(3)
 at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2710)
 at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2688)
 at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
 at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
 at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
 at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
 at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:268)
 at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:238)
 at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:256)
 at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1811)
 at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:189)
 at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:377)
 at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
 at 
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
 at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
 at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
 at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
 at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
 at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
 at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.immutable.Range.foreach(Range.scala:160)
 at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
 at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)