flink任务yarn perjob 提交任务如何设置job name
如题,我需要设置flink提交到yarn的job name应该怎么设置呢?
如何获取flink webUI上面的DAG图
我想要获取到flink webUI上面的DAG图,有什么办法能够获取到吗?
Re: flink1.11.2webui界面中的任务详情页面获取不到任务的received和sent数据详情
如果你的算子全都串在一个节点里面的话,是看不到输入输出的。 > 在 2020年10月13日,下午5:22,chenxuying 写道: > > 集群是local Standalone模式,任务可以正常的运行, sink是print,能在Stdout看到数据输出, > 但是在任务详情页面没有Bytes Received, Records Received, Bytes Sent , Records Sent等实时数据 > 都是0
Re: flink1.11.2webui界面中的任务详情页面获取不到任务的received和sent数据详情
source应该是没有输入指标吧 > 在 2020年10月13日,下午5:39,hailongwang <18868816...@163.com> 写道: > > Hi chenxuying, > > > 如果你的算子都是 chain 成一个 operator 的话,那么是没有的。因为 Source 没有 输出指标,Sink 没有输出指标,所以 > chain在一起是话就没有了。 > 如果你想要看的话,可以并发设置成不一样,或者显示的调用 disableChain。 > > > Best, > Hailong Wang > > > 在 2020-10-13 16:22:41,"chenxuying" 写道: >> 集群是local Standalone模式,任务可以正常的运行, sink是print,能在Stdout看到数据输出, >> 但是在任务详情页面没有Bytes Received, Records Received, Bytes Sent , Records Sent等实时数据 >> 都是0
使用flink-CDC checkpoint超时问题
我刚调研flink sql cdc功能,我有一个这样的需求,就是三张CDC的表做left join关联,由于三张表都会变化,所以使用cdc来做。 前两张表数据每张大概20万条数据,最后一张表只有几十条数据,我是讲三张表关联之后做成宽表写入的mysql中。 每次数据写到10万条左右任务失败,查看了一下日志,是checkpoint超时造成的。状态后端用的rocksDB。我想问 数据量也不是很大,为什么checkpoint会超时呢,也就是10分钟没有完成checkpoint?
Re: 使用flink-CDC checkpoint超时问题
1.我发现那张小表的checkpoint很快就完成了,只有数据量20万左右的表一直没有完成直到超时,数据量并不大,但是我发现 处理的速度是不是太慢了,写入mysql的数据大概是200条/s。 2.我发现cdc首次全量加载数据好像需要将表全量的数据处理完成之后才能checkpoint完成(之后增量的checkpoint应该就是binlog offset的方式) 所以当我的表数据比较大时,checkpoint永远都无法完成,我能增加checkpoint的超时时间吗,我在官网的配置里好像没有看到这个选项。 > 在 2020年11月3日,上午8:54,zhisheng 写道: > > hi > > 我建议可以从两方面排查一下: > > 1、检查 checkpoint 的大小,是不是很大? > > 2、检查作业是否反压?反压的情况下 checkpoint 一般很难成功,这种情况可以先解决反压的问题。 > > Best > zhisheng > > 丁浩浩 <18579099...@163.com> 于2020年11月2日周一 下午4:08写道: > >> 我刚调研flink sql cdc功能,我有一个这样的需求,就是三张CDC的表做left join关联,由于三张表都会变化,所以使用cdc来做。 >> 前两张表数据每张大概20万条数据,最后一张表只有几十条数据,我是讲三张表关联之后做成宽表写入的mysql中。 >> 每次数据写到10万条左右任务失败,查看了一下日志,是checkpoint超时造成的。状态后端用的rocksDB。我想问 >> 数据量也不是很大,为什么checkpoint会超时呢,也就是10分钟没有完成checkpoint? >> >>
提交flink sql任务报错
这个任务通过读取mysql CDC 然后关联之后写入到mysql中,每次提交任务都会报错,但是会正确提交的集群上去,并且成功执行。 我想问问是什么原因? The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) at com.gaotu.data.performance.flink.job.sql.CeresCanRenewalWide.main(CeresCanRenewalWide.java:150) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 8 more bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/stop-cluster.sh Stopping taskexecutor daemon (pid: 92273) on host bjhldeMacBook-Pro.local. Stopping standalonesession daemon (pid: 92004) on host bjhldeMacBook-Pro.local. bjhldeMacBook-Pro:flink-1.11.2 dinghh$ vim conf/flink-conf.yaml bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-x -bash: bin/start-x: No such file or directory bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host bjhldeMacBook-Pro.local. Starting taskexecutor daemon on host bjhldeMacBook-Pro.local.
Flink sql tinyint类型使用in 报错
我使用flink sql cdc取连接 mysql表的时候,当我的mysql表type这个字段类型是tinyint类型时 使用type in(1,2,3,4,5)会报以下的错误,只有当我把字段类型改成int的时候才能使用in,这是符合预期的吗,当字段类型不匹配的时候 flink sql不会自动转换类型吗? [ERROR] Could not execute SQL statement. Reason: org.codehaus.commons.compiler.CompileException: Line 6, Column 88: No applicable constructor/method found for actual parameters "int"; candidates are: "org.apache.flink.table.runtime.util.collections.ByteHashSet()
Re: Flink sql tinyint类型使用in 报错
请问有没有计划加入隐式类型自动转化呢 > 在 2020年11月10日,下午2:35,Jark Wu 写道: > > 是的。Flink 目前还不支持隐式类型转换。 > > Best, > Jark > > On Tue, 10 Nov 2020 at 14:28, hailongwang <18868816...@163.com> wrote: > >> Hi, >> >> >> 从你的报错来看,是 in 不支持隐式 CAST。 >> 你要么可以把 type 定义成 INT,要不把后面的值 CAST 成 TINYINT。 >> >> >> Best, >> Hailong Wang >> >> 在 2020-11-10 10:41:47,"丁浩浩" <18579099...@163.com> 写道: >>> 我使用flink sql cdc取连接 mysql表的时候,当我的mysql表type这个字段类型是tinyint类型时 使用type >> in(1,2,3,4,5)会报以下的错误,只有当我把字段类型改成int的时候才能使用in,这是符合预期的吗,当字段类型不匹配的时候 flink >> sql不会自动转换类型吗? >>> >>> [ERROR] Could not execute SQL statement. Reason: >>> org.codehaus.commons.compiler.CompileException: Line 6, Column 88: No >> applicable constructor/method found for actual parameters "int"; candidates >> are: "org.apache.flink.table.runtime.util.collections.ByteHashSet() >>
Re: Flink sql tinyint类型使用in 报错
就比如我要用flink cdc 接入mysql表,一般都是直接将mysql表的DDL稍加修改然后在flink sql中创建,一般都不会考虑到 类型转换的问题就直接沿用mysql中的类型,当然sql也是一样的。同样的sql在满足语法正确性的情况下,mysql中能跑,而flinksql中无法跑, 当然可以通过显示类型转化来完成,但是能提供自动转化会更好的提供易用性。 > 在 2020年11月10日,下午2:51,Danny Chan 写道: > > 暂时还没有 你们是什么场景需要用到隐式类型 > > 丁浩浩 <18579099...@163.com> 于2020年11月10日周二 下午2:45写道: > >> 请问有没有计划加入隐式类型自动转化呢 >> >>> 在 2020年11月10日,下午2:35,Jark Wu 写道: >>> >>> 是的。Flink 目前还不支持隐式类型转换。 >>> >>> Best, >>> Jark >>> >>> On Tue, 10 Nov 2020 at 14:28, hailongwang <18868816...@163.com> wrote: >>> >>>> Hi, >>>> >>>> >>>> 从你的报错来看,是 in 不支持隐式 CAST。 >>>> 你要么可以把 type 定义成 INT,要不把后面的值 CAST 成 TINYINT。 >>>> >>>> >>>> Best, >>>> Hailong Wang >>>> >>>> 在 2020-11-10 10:41:47,"丁浩浩" <18579099...@163.com> 写道: >>>>> 我使用flink sql cdc取连接 mysql表的时候,当我的mysql表type这个字段类型是tinyint类型时 使用type >>>> in(1,2,3,4,5)会报以下的错误,只有当我把字段类型改成int的时候才能使用in,这是符合预期的吗,当字段类型不匹配的时候 flink >>>> sql不会自动转换类型吗? >>>>> >>>>> [ERROR] Could not execute SQL statement. Reason: >>>>> org.codehaus.commons.compiler.CompileException: Line 6, Column 88: No >>>> applicable constructor/method found for actual parameters "int"; >> candidates >>>> are: "org.apache.flink.table.runtime.util.collections.ByteHashSet() >>>> >> >> >>
flink cdc 当mysql表字段修改之后提示没有找到这个字段
当mysql表字段修改之后,再用flink cdc接入,当使用到这个表的时候会提示字段不存在。 Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 53, column 15 to line 53, column 20: Column 'rounds' not found in table 'prcrs' at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) at com.gaotu.data.performance.flink.job.sql.RegularAnalysis.main(RegularAnalysis.java:234) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 53, column 15 to line 53, column 20: Column 'rounds' not found in table 'prcrs' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101) at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:884) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6009) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101) at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:884) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6009) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3312) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3302) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) at org.apache.calcite.sql.v
Flink sql查询NULL值错误
Select id, name, if(type=1,2,null) From user ; 当我执行上面的sql的时候提示我 [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of ‘NULL’ 是无法将null展示吗?
Re: Flink sql查询NULL值错误
感谢大佬!!! > 在 2020年11月10日,下午8:22,hailongwang <18868816...@163.com> 写道: > > Hi, > > > 需要将 null cast 成某个具体的值,比如: > if(type=1,2,cast(null as int)) > > > Best, > Hailong > 在 2020-11-10 19:14:44,"丁浩浩" <18579099...@163.com> 写道: >> Select >> id, >> name, >> if(type=1,2,null) >> From >> user ; >> 当我执行上面的sql的时候提示我 >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of ‘NULL’ >> 是无法将null展示吗? > > >
Flink sql cdc 锁超时
当我使用flink cdc 对多张表进行关联查询时其中的一张表总是会有锁超时的情况,导致任务无法正常启动, 请问这种情况应该如何处理? org.apache.kafka.connect.errors.ConnectException: Lock wait timeout exceeded; try restarting transaction Error code: 1205; SQLSTATE: 40001. at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:123) at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) at com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:782) at com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:666) at io.debezium.jdbc.JdbcConnection.executeWithoutCommitting(JdbcConnection.java:1201) at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:465) ... 3 more
Flink cdc mysql 字段是datetime类型时0000-00-00 00:00:00会被flink转成1970-01-01T00:00
当我mysql字段时datetime并且字段值是-00-00 00:00:00时,会被转成1970-01-01T00:00,如果我应该如何操作才能保证跟原数值保持一致? 输出的结果: 2> (true,1,zhangsan,18,1970-01-01T00:00) 3> (true,2,lisi,20,2020-11-11T14:17:46) 4> (true,3,wangwu,99,1970-01-01T00:00) 1> (true,4,zhaoliu,77,1970-01-01T00:00) 日志信息: 2020-11-11 14:30:37,418 - 19755 WARN [debezium-mysqlconnector-mysql_binlog_source-snapshot] io.debezium.connector.mysql.MySqlValueConverters:852 - Invalid value '-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos' converted to empty value 2020-11-11 14:30:37,424 - 19761 WARN [debezium-mysqlconnector-mysql_binlog_source-snapshot] io.debezium.connector.mysql.MySqlValueConverters:852 - Invalid value '-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos' converted to empty value 2020-11-11 14:30:37,424 - 19761 WARN [debezium-mysqlconnector-mysql_binlog_source-snapshot] io.debezium.connector.mysql.MySqlValueConverters:852 - Invalid value '-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos' converted to empty value
Flink cdc 多表关联处理延迟很大
我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 有没有比较好的优化方案能缓解这样的问题?
Re: Flink cdc 多表关联处理延迟很大
select ri.sub_clazz_number, prcrs.rounds, count(*) as num from subclazz gs JOIN (SELECT gce.number, min( gce.extension_value ) AS grade FROM course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP BY gce.number) AS temp ON temp.number = gs.course_number AND temp.grade>30 JOIN right_info ri ON gs.number = ri.sub_clazz_number join wide_subclazz ws on ws.number = ri.sub_clazz_number join course gc on gc.number = ws.course_number and gc.course_category_id in (30,40) left join performance_regular_can_renewal_sign prcrs on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2) where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null and not exists (select 1 from internal_staff gis where gis.user_id = ri.user_id) and not exists (select 1 from clazz_extension ce where ws.clazz_number = ce.number and ce.extension_type = 3 and ce.isdel = 0 and ce.extension_value in (1,3,4,7,8,11)) group by ri.sub_clazz_number, prcrs.rounds Sql代码是这样的。 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 > 在 2020年11月14日,下午5:53,Jark Wu 写道: > > 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? > 需要明确下,到底是什么节点慢了。 > > On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote: > >> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 >> 有没有比较好的优化方案能缓解这样的问题?
Flink sql 无法用!=
我想在where条件下用不等号报错,难道flink sql不等号不是!=这个吗? [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.runtime.CalciteException: Bang equal '!=' is not allowed under the current SQL conformance level
Re: Flink cdc 多表关联处理延迟很大
即使我将not exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。 下面是截图,(我上传图片每次都看不了啥情况) https://imgchr.com/i/DeqixU https://imgchr.com/i/DeqP2T > 在 2020年11月16日,上午10:29,Jark Wu 写道: > > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。 > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。 > > Best, > Jark > > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote: > >> select >>ri.sub_clazz_number, >>prcrs.rounds, >>count(*) as num >> from >>subclazz gs >> JOIN >>(SELECT gce.number, min( gce.extension_value ) AS grade FROM >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP >> BY gce.number) AS temp >> ON >>temp.number = gs.course_number AND temp.grade>30 >> JOIN >>right_info ri >> ON >>gs.number = ri.sub_clazz_number >> join >>wide_subclazz ws >> on >>ws.number = ri.sub_clazz_number >> join >>course gc >> on >>gc.number = ws.course_number and gc.course_category_id in (30,40) >> left join >>performance_regular_can_renewal_sign prcrs >> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2) >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null >> and not exists (select 1 from internal_staff gis where gis.user_id = >> ri.user_id) >> and not exists (select 1 from clazz_extension ce where ws.clazz_number = >> ce.number >>and ce.extension_type = 3 and ce.isdel = 0 >>and ce.extension_value in (1,3,4,7,8,11)) >> group by ri.sub_clazz_number, prcrs.rounds >> Sql代码是这样的。 >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 >> >>> 在 2020年11月14日,下午5:53,Jark Wu 写道: >>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? >>> 需要明确下,到底是什么节点慢了。 >>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote: >>> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 >>>> 有没有比较好的优化方案能缓解这样的问题? >> >> >>
Re: Flink cdc 多表关联处理延迟很大
我设置的全是cdcmysql表关联 > 在 2020年11月18日,下午1:07,hailongwang <18868816...@163.com> 写道: > > 抱歉,描述错了。。 > 你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢 > > 在 2020-11-18 11:59:52,"hailongwang" <18868816...@163.com> 写道: >> 我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ? >> 你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢? >> 如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。 >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table >> 在 2020-11-18 10:34:48,"Jark Wu" 写道: >>> 另外,join 节点的并发可以再增加一些,提升 join 的处理性能。 >>> >>> On Wed, 18 Nov 2020 at 11:34, Jark Wu wrote: >>> >>>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。 >>>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。 >>>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ >>>> >>>> 解决办法文中也有提及: >>>> >>>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下: >>>> >>>> execution.checkpointing.interval: 10min # checkpoint间隔时间 >>>> execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint >>>> 失败容忍次数 >>>> restart-strategy: fixed-delay # 重试策略 >>>> restart-strategy.fixed-delay.attempts: 2147483647 # 重试次数 >>>> >>>> Best, >>>> Jark >>>> >>>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <18579099...@163.com> wrote: >>>> >>>>> 即使我将not >>>>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。 >>>>> 下面是截图,(我上传图片每次都看不了啥情况) >>>>> https://imgchr.com/i/DeqixU >>>>> https://imgchr.com/i/DeqP2T >>>>> >>>>>> 在 2020年11月16日,上午10:29,Jark Wu 写道: >>>>>> >>>>>> 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。 >>>>>> 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。 >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote: >>>>>> >>>>>>> select >>>>>>> ri.sub_clazz_number, >>>>>>> prcrs.rounds, >>>>>>> count(*) as num >>>>>>> from >>>>>>> subclazz gs >>>>>>> JOIN >>>>>>> (SELECT gce.number, min( gce.extension_value ) AS grade FROM >>>>>>> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 >>>>> GROUP >>>>>>> BY gce.number) AS temp >>>>>>> ON >>>>>>> temp.number = gs.course_number AND temp.grade>30 >>>>>>> JOIN >>>>>>> right_info ri >>>>>>> ON >>>>>>> gs.number = ri.sub_clazz_number >>>>>>> join >>>>>>> wide_subclazz ws >>>>>>> on >>>>>>> ws.number = ri.sub_clazz_number >>>>>>> join >>>>>>> course gc >>>>>>> on >>>>>>> gc.number = ws.course_number and gc.course_category_id in (30,40) >>>>>>> left join >>>>>>> performance_regular_can_renewal_sign prcrs >>>>>>> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2) >>>>>>> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null >>>>>>> and not exists (select 1 from internal_staff gis where gis.user_id = >>>>>>> ri.user_id) >>>>>>> and not exists (select 1 from clazz_extension ce where ws.clazz_number >>>>> = >>>>>>> ce.number >>>>>>> and ce.extension_type = 3 and ce.isdel = 0 >>>>>>> and ce.extension_value in (1,3,4,7,8,11)) >>>>>>> group by ri.sub_clazz_number, prcrs.rounds >>>>>>> Sql代码是这样的。 >>>>>>> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 >>>>>>> >>>>>>>> 在 2020年11月14日,下午5:53,Jark Wu 写道: >>>>>>>> >>>>>>>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? >>>>>>>> 需要明确下,到底是什么节点慢了。 >>>>>>>> >>>>>>>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote: >>>>>>>> >>>>>>>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 >>>>>>>>> 有没有比较好的优化方案能缓解这样的问题? >>>>>>> >>>>>>> >>>>>>> >>>>> >>>>> >>>>>
Flink cdc扫描过程报错Query execution was interrupted, maximum statement execution time exceeded
我有一张大表有上亿条数据,但是在扫描过程中发生错误。提示超过最大执行时间,这个应该如何解决。 2020-11-18 17:07:43,658 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 7: - 10403 of 131508640 rows scanned from table 'test.right_info' after 01:00:00.952 2020-11-18 17:07:44,131 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 7: - 10404 of 131508640 rows scanned from table 'test.right_info' after 01:00:01.425 2020-11-18 17:07:44,601 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 7: committing transaction 2020-11-18 17:07:44,601 ERROR io.debezium.connector.mysql.SnapshotReader [] - Failed due to error: Aborting snapshot due to error when last running 'SELECT * FROM `test`.`right_info`': Query execution was interrupted, maximum statement execution time exceeded org.apache.kafka.connect.errors.ConnectException: Query execution was interrupted, maximum statement execution time exceeded Error code: 3024; SQLSTATE: HY000. at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_91] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_91] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91] Caused by: java.sql.SQLException: Query execution was interrupted, maximum statement execution time exceeded at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at com.mysql.cj.jdbc.result.ResultSetImpl.next(ResultSetImpl.java:1739) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at io.debezium.connector.mysql.SnapshotReader.lambda$execute$14(SnapshotReader.java:648) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:473) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:641) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] ... 3 more 2020-11-18 17:07:44,606 INFO io.debezium.connector.common.BaseSourceTask [] - Stopping down connector 2020-11-18 17:07:44,606 INFO io.debezium.connector.mysql.MySqlConnectorTask [] - Stopping MySQL connector task 2020-11-18 17:07:44,606 INFO io.debezium.connector.mysql.ChainedReader [] - ChainedReader: Stopping the snapshot reader 2020-11-18 17:07:44,607 INFO io.debezium.connector.mysql.SnapshotReader [] - Discarding 4363 unsent record(s) due to the connector shutting down 2020-11-18 17:07:44,607 INFO io.debezium.connector.mysql.SnapshotReader [] - Discarding 0 unsent record(s) due to the connector shutting down 2020-11-18 17:07:44,608 INFO io.debezium.connector.mysql.MySqlConnectorTask [] - Connector task finished all work and is now shutdown 2020-11-18 17:07:44,609 ERROR com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction[] - Reporting error: org.apache.kafka.connect.errors.ConnectException: Query execution was interrupted, maximum statement execution time exceeded Error code: 3024; SQLSTATE: HY000. at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_91] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_91] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91] Caused by: java.sql.SQLException: Query execution was interrupted, maximum statement execution time exceeded at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0] at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[flink-connector-mysql-cdc-1.1.0.
Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()
flink版本:1.11.1 udaf函数代码来自于阿里云官网文档 以下是代码 public class TestSql { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env); //env.setParallelism(3); tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class); Properties configs = CommonUtils.getConfigs(); //注册clazz源表 FlinkUtils.registerMysqlTable2FlinkTable( tableEnv,configs.getProperty("url"), configs.getProperty("user.name"), configs.getProperty("password"), “test", "clazz_lesson"); Table table = tableEnv.sqlQuery("select count_uadf(clazz_number),clazz_number from clazz_lesson group by clazz_number"); //Table table = tableEnv.sqlQuery("select number,collect(extension_value) from clazz_extension group by number "); tableEnv.toRetractStream(table, Row.class).print(); env.execute(); } } public class CountUdaf extends AggregateFunction { //定义存放count UDAF状态的accumulator的数据的结构。 public static class CountAccum { public long total; } @Override //初始化count UDAF的accumulator。 public CountAccum createAccumulator() { CountAccum acc = new CountAccum(); acc.total = 0; return acc; } @Override //getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。 public Long getValue(CountAccum accumulator) { return accumulator.total; } //accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。 public void accumulate(CountAccum accumulator, Long iValue) { accumulator.total++; } public void merge(CountAccum accumulator, Iterable its) { for (CountAccum other : its) { accumulator.total += other.total; } } } 以下是堆栈信息 - Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 8 to line 1, column 31: No match found for function signature count_uadf() at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) at com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 31: No match found for function signature count_uadf() at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481) at org.apache.calcite.sql.validate.SqlValidatorImpl.valida
Re: Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()
问题我自己已经解决。 > 在 2020年12月17日,下午9:00,丁浩浩 <18579099...@163.com> 写道: > > flink版本:1.11.1 > udaf函数代码来自于阿里云官网文档 > > 以下是代码 > public class TestSql { >public static void main(String[] args) throws Exception { >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); >StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env); >//env.setParallelism(3); >tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class); > >Properties configs = CommonUtils.getConfigs(); >//注册clazz源表 >FlinkUtils.registerMysqlTable2FlinkTable( >tableEnv,configs.getProperty("url"), >configs.getProperty("user.name"), > configs.getProperty("password"), >“test", "clazz_lesson"); > >Table table = tableEnv.sqlQuery("select > count_uadf(clazz_number),clazz_number from clazz_lesson group by > clazz_number"); >//Table table = tableEnv.sqlQuery("select > number,collect(extension_value) from clazz_extension group by number "); >tableEnv.toRetractStream(table, Row.class).print(); >env.execute(); > > >} > } > > > > public class CountUdaf extends AggregateFunction { >//定义存放count UDAF状态的accumulator的数据的结构。 >public static class CountAccum { >public long total; >} > >@Override >//初始化count UDAF的accumulator。 >public CountAccum createAccumulator() { >CountAccum acc = new CountAccum(); >acc.total = 0; >return acc; >} >@Override >//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。 >public Long getValue(CountAccum accumulator) { >return accumulator.total; >} > > >//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。 >public void accumulate(CountAccum accumulator, Long iValue) { >accumulator.total++; >} >public void merge(CountAccum accumulator, Iterable its) { >for (CountAccum other : its) { >accumulator.total += other.total; >} >} > } > > 以下是堆栈信息 > > - > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 1, column 8 to line 1, column 31: No match > found for function signature count_uadf() > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) > at > com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 8 to line 1, column 31: No match found for function signature > count_uadf() > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) >
Flink-connector-jdbc源码问题疑惑
flink版本:1.11 1.首先第一个问题是在1.11的官方文档connectors的jdbc页面上有这样一句话 Notice that scan.partition.lower-bound and scan.partition.upper-bound are just used to decide the partition stride, not for filtering the rows in table. 这与我观看的源码的结果是不一致的,scan.partition.lower-bound and scan.partition.upper-bound 这两个参数会导致数据被过滤掉,今天我看了一下1.13的文档,这句话更改成了Notice that scan.partition.lower-bound and scan.partition.upper-bound are used to decide the partition stride and filter the rows in table. 这个是不是说明之前1.11文档的这句话是错的? 2。我在看inputFormat接口时,注释有这么一句话 IMPORTANT NOTE: Input formats must be written such that an instance can be opened again after it was closed. That * is due to the fact that the input format is used for potentially multiple splits. After a split is done, the * format's close function is invoked and, if another split is available, the open function is invoked afterwards for * the next split. 这里有一个问题是,我设置 scan.partition.num: The number of partitions. 这个参数 10。然后我的并行度设置为2,也就是每个slot里面会有一个Inputformat用于读取数据,当一个partition读完之后,会调用close方法,然后又会再次调用open方法,然后读取下一个分区。不知道这样理解是否正确? 然后假设我的分区是10,并行度是3,这个分区是怎么分配到slot的inputformat的呢?