转发: flink创建视图后,SELECT语句后使用OPTIONS报错
感谢老师的回复,然而我的部署环境下的lib中没有您说的这个包,请问是要移除哪个包呢? 我的lib下有的包: flink-csv-1.13.0.jar flink-dist_2.11-1.13.0.jar flink-json-1.13.0.jar flink-shaded-zookeeper-3.4.14.jar flink-sql-connector-mysql-cdc-2.1.1.jar flink-table_2.11-1.13.0.jar flink-table-blink_2.11-1.13.0.jar log4j-1.2-api-2.12.1.jar log4j-api-2.12.1.jar log4j-core-2.12.1.jar log4j-slf4j-impl-2.12.1.jar -邮件原件- 发件人: godfrey he [mailto:godfre...@gmail.com] 发送时间: 2022年2月16日 16:47 收件人: user-zh 主题: Re: flink创建视图后,SELECT语句后使用OPTIONS报错 Hi liangjinghong, 原因是 blink planner 中引入并修改了 SqlTableRef 类, 而 Legacy planner 中没有引入 SqlTableRef 类,从而导致加载到了Calcite 中 SqlTableRef (该类有问题)。 解决方案:如果只使用到了blink planner,可以把legacy planner 的包冲lib下移除。 Best, Godfrey liangjinghong 于2022年2月14日周一 17:26写道: > 各位老师们好,以下代码在开发环境中可以执行,打包部署后报错: > > 代码: > > CREATE VIEW used_num_common > > (toolName,region,type,flavor,used_num) > > AS > > select info.toolName as toolName,r.regionName as > region,f.type,f.flavor,count(1) as used_num from > > tbl_schedule_job/*+ OPTIONS('server-id'='1001-1031') */ job > > join > > tbl_schedule_task/*+ OPTIONS('server-id'='2001-2031') */ task > > on job.jobId = task.jobId > > join > > tbl_broker_node/*+ OPTIONS('server-id'='3001-3031') */ node > > on task.nodeId = node.id > > join > > tbl_app_info/*+ OPTIONS('server-id'='4001-4031') */ info > > on job.appId = info.appId > > join > > tbl_region r > > on node.region/*+ OPTIONS('server-id'='5001-5031') */ = r.region > > join > > tbl_flavor/*+ OPTIONS('server-id'='6001-6031') */ f > > on node.resourcesSpec = f.flavor > > where job.jobStatus in ('RUNNING','ERROR','INITING') > > and task.taskStatus in ('RUNNING','ERROR','INITING') > > and node.machineStatus <> 'DELETED' > > and toolName is not null > > group by info.toolName,r.regionName,f.type,f.flavor > > … > > 打包部署后报错如下: > > The main method caused an error: class org.apache.calcite.sql.SqlSyntax$6: > SPECIAL > > 2022-02-08 13:33:39,350 WARN > org.apache.flink.client.deployment.application.DetachedApplicationRunn > er [] > - Could not execute application: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: class org.apache.calcite.sql.SqlSyntax$6: > SPECIAL > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(Package > dProgram.java:372) > ~[flink-dist_2.11-1.13.0.jar:1.13.0] > > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeF > orExecution(PackagedProgram.java:222) > ~[flink-dist_2.11-1.13.0.jar:1.13.0] > > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:11 > 4) > ~[flink-dist_2.11-1.13.0.jar:1.13.0] > > at > org.apache.flink.client.deployment.application.DetachedApplicationRunn > er.tryExecuteJobs(DetachedApplicationRunner.java:84) > ~[flink-dist_2.11-1.13.0.jar:1.13.0] > > at > org.apache.flink.client.deployment.application.DetachedApplicationRunn > er.run(DetachedApplicationRunner.java:70) > ~[flink-dist_2.11-1.13.0.jar:1.13.0] > > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$hand > leRequest$0(JarRunHandler.java:102) > ~[flink-dist_2.11-1.13.0.jar:1.13.0] > > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFutu > re.java:1700) > [?:?] > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515 > ) > [?:?] > > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] > > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.r > un(ScheduledThreadPoolExecutor.java:304) > [?:?] > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.j > ava:1128) > [?:?] > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor. > java:628) > [?:?] > > at java.lang.Thread.run(Thread.java:829) [?:?] > > Caused by: java.lang.UnsupportedOperationException: class > org.apache.calcite.sql.SqlSyntax$6: SPECIAL > > at org.apache.calcite.util.Util.needToImplement(Util.java:1075) > ~[flink-table_2.11-1.13.0.jar:1.13.0] > > at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116) > ~[flink-table_2.11-1.13.0.jar:1.13.0] > > at > org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:329) > ~[flink-table_2.11-1.13.0.jar:1.13.0] > > at > org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) > ~[flink-table_2.11-1.13.0.jar:1.13.0] > > at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:101) > ~[flink-table_2.11-1.13.0.jar:1.13.0] > > at > org.apache
flink创建视图后,SELECT语句后使用OPTIONS报错
各位老师们好,以下代码在开发环境中可以执行,打包部署后报错: 代码: CREATE VIEW used_num_common (toolName,region,type,flavor,used_num) AS select info.toolName as toolName,r.regionName as region,f.type,f.flavor,count(1) as used_num from tbl_schedule_job/*+ OPTIONS('server-id'='1001-1031') */ job join tbl_schedule_task/*+ OPTIONS('server-id'='2001-2031') */ task on job.jobId = task.jobId join tbl_broker_node/*+ OPTIONS('server-id'='3001-3031') */ node on task.nodeId = node.id join tbl_app_info/*+ OPTIONS('server-id'='4001-4031') */ info on job.appId = info.appId join tbl_region r on node.region/*+ OPTIONS('server-id'='5001-5031') */ = r.region join tbl_flavor/*+ OPTIONS('server-id'='6001-6031') */ f on node.resourcesSpec = f.flavor where job.jobStatus in ('RUNNING','ERROR','INITING') and task.taskStatus in ('RUNNING','ERROR','INITING') and node.machineStatus <> 'DELETED' and toolName is not null group by info.toolName,r.regionName,f.type,f.flavor … 打包部署后报错如下: The main method caused an error: class org.apache.calcite.sql.SqlSyntax$6: SPECIAL 2022-02-08 13:33:39,350 WARN org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: class org.apache.calcite.sql.SqlSyntax$6: SPECIAL at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.11-1.13.0.jar:1.13.0] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.13.0.jar:1.13.0] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0] at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~[flink-dist_2.11-1.13.0.jar:1.13.0] at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) ~[flink-dist_2.11-1.13.0.jar:1.13.0] at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102) ~[flink-dist_2.11-1.13.0.jar:1.13.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlSyntax$6: SPECIAL at org.apache.calcite.util.Util.needToImplement(Util.java:1075) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:329) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:101) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104) ~[flink-table_2.11-1.13.0.jar:1.13.0] at org.apache.calcite.sql.SqlSelectOperator.unparse(SqlSelectOperator.java:176) ~[flink-table_2.11-1.13.0.jar:1.13.0] at
请教设置了table.exec.state.ttl后,结果数据不更新问题
你好,我是一个flink新手。为了进行状态管理,我在代码中设置了configuration.setString("table.exec.state.ttl","12h"); 然而,在flink作业运行12小时后,我的update结果表再也没有更新过。从web ui可以看到,我的源头与一些算子的Records Sent一直在增长,任务也没有出现任何异常,checkpoint也正常,所以我很苦恼是哪里出现了问题 [cid:image005.jpg@01D8201E.4C9301C0] [cid:image006.jpg@01D8201E.4C9301C0] 以下是我的SQL语句: CREATE TABLE `tbl_rpt_app_usage` (`datepoint` TIMESTAMP, `appId` VARCHAR , `flavor` VARCHAR , `applyNum` BIGINT , `releaseNum` BIGINT , `usedNum` BIGINT , PRIMARY KEY (`datepoint`) NOT ENFORCED )WITH( 'connector' = 'mysql-cdc', 'hostname' = '', 'port' = '3306', 'username' = '', 'password' = '', 'database-name' = '', 'table-name' = '' ); CREATE TABLE `temporary_usage` (`datepoint` TIMESTAMP, `appId` VARCHAR , `flavor` VARCHAR , `applyNum` BIGINT , `releaseNum` BIGINT , `usedNum` BIGINT , PRIMARY KEY (`datepoint`) NOT ENFORCED )WITH( 'connector' = 'mysql-cdc', 'hostname' = '', 'port' = '', 'username' = '', 'password' = '', 'database-name' = '', 'table-name' = '' ); CREATE TABLE `tbl_cce_cluster` (`clusterId` VARCHAR , `region` VARCHAR , `flavor` VARCHAR , `totalNode` BIGINT , `toolName` VARCHAR , `appId` VARCHAR , PRIMARY KEY (`clusterId`) NOT ENFORCED )WITH( 'connector' = 'mysql-cdc', 'hostname' = '', 'port' = '3306', 'username' = '', 'password' = '', 'database-name' = '', 'table-name' = '' ); ---mysql落地表 CREATE table sink( code STRING, name STRING, usedcnt BIGINT, `time` TIMESTAMP, type STRING, PRIMARY KEY (`time`) NOT ENFORCED ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql: ', 'username' = '', 'password' = '', 'table-name' = 'sink' ); (因为MySQLCDC似乎不支持在同步时筛选指定内容,所以目前同步后先创建筛选结果的虚拟表,再进行计算,还想请教下是否有更好的办法) CREATE VIEW rpt (datepoint,appId,flavor,applyNum,releaseNum,usedNum) AS select * from tbl_rpt_app_usage where datepoint > '2022-02-11 14:54:01' and appId not in ('aaa') union all select * from temporary_usage CREATE VIEW cce (clusterId,region,flavor,totalNode,toolName,appId) AS select * from tbl_cce_cluster where toolName in ('bbb','ccc') ---代码逻辑如下: insert into sink select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from( select info.code, info.name, sum(rpt.usedNum) as usedcnt,'online' type from ( select * from ( select *,ROW_NUMBER() OVER(PARTITION by appId,flavor order by datepoint desc) as row_num from rpt/*+ OPTIONS('server-id'='1001-1005') */ )where row_num =1 )rpt join info on info.appid=rpt.appId group by info.code,info.name) union all select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from( select info.code, info.name, sum(totalNode) as usedcnt,'online' type from cce/*+ OPTIONS('server-id'='1006-1010') */ join info on cce.appId=info.appid group by info.code,info.name) 我的结果表在[cid:image007.png@01D8201D.D86E3030]的12小时(我设置的过期时间)后,再也没更新过。 [cid:image008.png@01D8201D.D86E3030] 另外,还想请教一个问题:目前group by算子的落地到数据库的结果只支持update吗?业务期待获取历史数据,目前只能想到每分钟全量同步一次结果表的数据到另一个表里,这样就可以追踪到历史数据,是否还有更好的解决办法呢? 非常感谢您的阅读与解惑!
请教三张以上时态表关联,加where条件后报错 mismatched type $6 TIMESTAMP(3)的问题
你好,因业务需求,flink 1.13,MySQL CDC 2.1.1下需要将三张满足时态表结构的表关联,在没有对关联结果加where条件时,可以正常运行,加了where条件后,报错如下: SQL: insert into sink select count(1) as machine from tbl_schedule_job as job join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t on t.jobId = job.jobId FOR SYSTEM_TIME AS OF t.lastModifiedTime AS n where job.jobStatus in ('RUNNING','INITING','ERROR') 报错: Exception in thread "main" java.lang.AssertionError: mismatched type $6 TIMESTAMP(3) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2710) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2688) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:268) at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:238) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:256) at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1811) at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:189) at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:377) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271) at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54)