flink任务yarn perjob 提交任务如何设置job name

2020-09-30 Thread
如题,我需要设置flink提交到yarn的job name应该怎么设置呢?

如何获取flink webUI上面的DAG图

2020-10-12 Thread
我想要获取到flink webUI上面的DAG图,有什么办法能够获取到吗?

Re: flink1.11.2webui界面中的任务详情页面获取不到任务的received和sent数据详情

2020-10-13 Thread
如果你的算子全都串在一个节点里面的话,是看不到输入输出的。

> 在 2020年10月13日,下午5:22,chenxuying  写道:
> 
> 集群是local Standalone模式,任务可以正常的运行, sink是print,能在Stdout看到数据输出,
> 但是在任务详情页面没有Bytes Received, Records Received, Bytes Sent , Records Sent等实时数据 
> 都是0



Re: flink1.11.2webui界面中的任务详情页面获取不到任务的received和sent数据详情

2020-10-13 Thread
source应该是没有输入指标吧

> 在 2020年10月13日,下午5:39,hailongwang <18868816...@163.com> 写道:
> 
> Hi chenxuying,
> 
> 
> 如果你的算子都是 chain 成一个 operator 的话,那么是没有的。因为 Source 没有 输出指标,Sink 没有输出指标,所以 
> chain在一起是话就没有了。
> 如果你想要看的话,可以并发设置成不一样,或者显示的调用 disableChain。
> 
> 
> Best,
> Hailong Wang
> 
> 
> 在 2020-10-13 16:22:41,"chenxuying"  写道:
>> 集群是local Standalone模式,任务可以正常的运行, sink是print,能在Stdout看到数据输出,
>> 但是在任务详情页面没有Bytes Received, Records Received, Bytes Sent , Records Sent等实时数据 
>> 都是0



使用flink-CDC checkpoint超时问题

2020-11-02 Thread
我刚调研flink sql cdc功能,我有一个这样的需求,就是三张CDC的表做left join关联,由于三张表都会变化,所以使用cdc来做。
前两张表数据每张大概20万条数据,最后一张表只有几十条数据,我是讲三张表关联之后做成宽表写入的mysql中。
每次数据写到10万条左右任务失败,查看了一下日志,是checkpoint超时造成的。状态后端用的rocksDB。我想问
数据量也不是很大,为什么checkpoint会超时呢,也就是10分钟没有完成checkpoint?



Re: 使用flink-CDC checkpoint超时问题

2020-11-02 Thread
1.我发现那张小表的checkpoint很快就完成了,只有数据量20万左右的表一直没有完成直到超时,数据量并不大,但是我发现
处理的速度是不是太慢了,写入mysql的数据大概是200条/s。
2.我发现cdc首次全量加载数据好像需要将表全量的数据处理完成之后才能checkpoint完成(之后增量的checkpoint应该就是binlog 
offset的方式)
所以当我的表数据比较大时,checkpoint永远都无法完成,我能增加checkpoint的超时时间吗,我在官网的配置里好像没有看到这个选项。

> 在 2020年11月3日,上午8:54,zhisheng  写道:
> 
> hi
> 
> 我建议可以从两方面排查一下:
> 
> 1、检查 checkpoint 的大小,是不是很大?
> 
> 2、检查作业是否反压?反压的情况下 checkpoint 一般很难成功,这种情况可以先解决反压的问题。
> 
> Best
> zhisheng
> 
> 丁浩浩 <18579099...@163.com> 于2020年11月2日周一 下午4:08写道:
> 
>> 我刚调研flink sql cdc功能,我有一个这样的需求,就是三张CDC的表做left join关联,由于三张表都会变化,所以使用cdc来做。
>> 前两张表数据每张大概20万条数据,最后一张表只有几十条数据,我是讲三张表关联之后做成宽表写入的mysql中。
>> 每次数据写到10万条左右任务失败,查看了一下日志,是checkpoint超时造成的。状态后端用的rocksDB。我想问
>> 数据量也不是很大,为什么checkpoint会超时呢,也就是10分钟没有完成checkpoint?
>> 
>> 



提交flink sql任务报错

2020-11-04 Thread
这个任务通过读取mysql CDC 然后关联之后写入到mysql中,每次提交任务都会报错,但是会正确提交的集群上去,并且成功执行。
我想问问是什么原因?

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: No operators defined in streaming topology. Cannot execute.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.IllegalStateException: No operators defined in streaming 
topology. Cannot execute.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
at 
com.gaotu.data.performance.flink.job.sql.CeresCanRenewalWide.main(CeresCanRenewalWide.java:150)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 8 more
bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 92273) on host bjhldeMacBook-Pro.local.
Stopping standalonesession daemon (pid: 92004) on host bjhldeMacBook-Pro.local.
bjhldeMacBook-Pro:flink-1.11.2 dinghh$ vim conf/flink-conf.yaml 
bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-x
-bash: bin/start-x: No such file or directory
bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host bjhldeMacBook-Pro.local.
Starting taskexecutor daemon on host bjhldeMacBook-Pro.local.

Flink sql tinyint类型使用in 报错

2020-11-09 Thread
我使用flink sql cdc取连接 mysql表的时候,当我的mysql表type这个字段类型是tinyint类型时 使用type 
in(1,2,3,4,5)会报以下的错误,只有当我把字段类型改成int的时候才能使用in,这是符合预期的吗,当字段类型不匹配的时候 flink 
sql不会自动转换类型吗?

[ERROR] Could not execute SQL statement. Reason:
org.codehaus.commons.compiler.CompileException: Line 6, Column 88: No 
applicable constructor/method found for actual parameters "int"; candidates 
are: "org.apache.flink.table.runtime.util.collections.ByteHashSet()

Re: Flink sql tinyint类型使用in 报错

2020-11-09 Thread
请问有没有计划加入隐式类型自动转化呢

> 在 2020年11月10日,下午2:35,Jark Wu  写道:
> 
> 是的。Flink 目前还不支持隐式类型转换。
> 
> Best,
> Jark
> 
> On Tue, 10 Nov 2020 at 14:28, hailongwang <18868816...@163.com> wrote:
> 
>> Hi,
>> 
>> 
>> 从你的报错来看,是 in 不支持隐式 CAST。
>> 你要么可以把 type 定义成 INT,要不把后面的值 CAST 成  TINYINT。
>> 
>> 
>> Best,
>> Hailong Wang
>> 
>> 在 2020-11-10 10:41:47,"丁浩浩" <18579099...@163.com> 写道:
>>> 我使用flink sql cdc取连接 mysql表的时候,当我的mysql表type这个字段类型是tinyint类型时 使用type
>> in(1,2,3,4,5)会报以下的错误,只有当我把字段类型改成int的时候才能使用in,这是符合预期的吗,当字段类型不匹配的时候 flink
>> sql不会自动转换类型吗?
>>> 
>>> [ERROR] Could not execute SQL statement. Reason:
>>> org.codehaus.commons.compiler.CompileException: Line 6, Column 88: No
>> applicable constructor/method found for actual parameters "int"; candidates
>> are: "org.apache.flink.table.runtime.util.collections.ByteHashSet()
>> 




Re: Flink sql tinyint类型使用in 报错

2020-11-09 Thread
就比如我要用flink cdc 接入mysql表,一般都是直接将mysql表的DDL稍加修改然后在flink sql中创建,一般都不会考虑到
类型转换的问题就直接沿用mysql中的类型,当然sql也是一样的。同样的sql在满足语法正确性的情况下,mysql中能跑,而flinksql中无法跑,
当然可以通过显示类型转化来完成,但是能提供自动转化会更好的提供易用性。

> 在 2020年11月10日,下午2:51,Danny Chan  写道:
> 
> 暂时还没有 你们是什么场景需要用到隐式类型
> 
> 丁浩浩 <18579099...@163.com> 于2020年11月10日周二 下午2:45写道:
> 
>> 请问有没有计划加入隐式类型自动转化呢
>> 
>>> 在 2020年11月10日,下午2:35,Jark Wu  写道:
>>> 
>>> 是的。Flink 目前还不支持隐式类型转换。
>>> 
>>> Best,
>>> Jark
>>> 
>>> On Tue, 10 Nov 2020 at 14:28, hailongwang <18868816...@163.com> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> 
>>>> 从你的报错来看,是 in 不支持隐式 CAST。
>>>> 你要么可以把 type 定义成 INT,要不把后面的值 CAST 成  TINYINT。
>>>> 
>>>> 
>>>> Best,
>>>> Hailong Wang
>>>> 
>>>> 在 2020-11-10 10:41:47,"丁浩浩" <18579099...@163.com> 写道:
>>>>> 我使用flink sql cdc取连接 mysql表的时候,当我的mysql表type这个字段类型是tinyint类型时 使用type
>>>> in(1,2,3,4,5)会报以下的错误,只有当我把字段类型改成int的时候才能使用in,这是符合预期的吗,当字段类型不匹配的时候 flink
>>>> sql不会自动转换类型吗?
>>>>> 
>>>>> [ERROR] Could not execute SQL statement. Reason:
>>>>> org.codehaus.commons.compiler.CompileException: Line 6, Column 88: No
>>>> applicable constructor/method found for actual parameters "int";
>> candidates
>>>> are: "org.apache.flink.table.runtime.util.collections.ByteHashSet()
>>>> 
>> 
>> 
>> 




flink cdc 当mysql表字段修改之后提示没有找到这个字段

2020-11-09 Thread
当mysql表字段修改之后,再用flink cdc接入,当使用到这个表的时候会提示字段不存在。
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. From line 53, column 15 to line 53, column 20: Column 
'rounds' not found in table 'prcrs'
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at 
com.gaotu.data.performance.flink.job.sql.RegularAnalysis.main(RegularAnalysis.java:234)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 53, 
column 15 to line 53, column 20: Column 'rounds' not found in table 'prcrs'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
at 
org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)
at 
org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)
at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:884)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6009)
at 
org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50)
at 
org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)
at 
org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)
at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:884)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6009)
at 
org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50)
at 
org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3312)
at 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3302)
at 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
at 
org.apache.calcite.sql.v

Flink sql查询NULL值错误

2020-11-10 Thread
Select 
id,
name,
if(type=1,2,null) 
From
user ;
当我执行上面的sql的时候提示我
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of ‘NULL’ 
是无法将null展示吗?

Re: Flink sql查询NULL值错误

2020-11-10 Thread
感谢大佬!!!

> 在 2020年11月10日,下午8:22,hailongwang <18868816...@163.com> 写道:
> 
> Hi,
> 
> 
> 需要将 null cast 成某个具体的值,比如:
> if(type=1,2,cast(null as int))
> 
> 
> Best,
> Hailong
> 在 2020-11-10 19:14:44,"丁浩浩" <18579099...@163.com> 写道:
>> Select 
>>  id,
>>  name,
>>  if(type=1,2,null) 
>> From
>>  user ;
>> 当我执行上面的sql的时候提示我
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of ‘NULL’ 
>> 是无法将null展示吗?
> 
> 
> 




Flink sql cdc 锁超时

2020-11-10 Thread
当我使用flink cdc 对多张表进行关联查询时其中的一张表总是会有锁超时的情况,导致任务无法正常启动,
请问这种情况应该如何处理?
org.apache.kafka.connect.errors.ConnectException: Lock wait timeout exceeded; 
try restarting transaction Error code: 1205; SQLSTATE: 40001.
at 
io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
at 
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock 
wait timeout exceeded; try restarting transaction
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:123)
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at 
com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:782)
at com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:666)
at 
io.debezium.jdbc.JdbcConnection.executeWithoutCommitting(JdbcConnection.java:1201)
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:465)
... 3 more


Flink cdc mysql 字段是datetime类型时0000-00-00 00:00:00会被flink转成1970-01-01T00:00

2020-11-10 Thread
当我mysql字段时datetime并且字段值是-00-00 
00:00:00时,会被转成1970-01-01T00:00,如果我应该如何操作才能保证跟原数值保持一致?
输出的结果:
2> (true,1,zhangsan,18,1970-01-01T00:00)
3> (true,2,lisi,20,2020-11-11T14:17:46)
4> (true,3,wangwu,99,1970-01-01T00:00)
1> (true,4,zhaoliu,77,1970-01-01T00:00)
日志信息:
2020-11-11 14:30:37,418 - 19755 WARN  
[debezium-mysqlconnector-mysql_binlog_source-snapshot] 
io.debezium.connector.mysql.MySqlValueConverters:852  - Invalid value 
'-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos' 
converted to empty value
2020-11-11 14:30:37,424 - 19761 WARN  
[debezium-mysqlconnector-mysql_binlog_source-snapshot] 
io.debezium.connector.mysql.MySqlValueConverters:852  - Invalid value 
'-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos' 
converted to empty value
2020-11-11 14:30:37,424 - 19761 WARN  
[debezium-mysqlconnector-mysql_binlog_source-snapshot] 
io.debezium.connector.mysql.MySqlValueConverters:852  - Invalid value 
'-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos' 
converted to empty value




Flink cdc 多表关联处理延迟很大

2020-11-13 Thread
我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
有没有比较好的优化方案能缓解这样的问题?

Re: Flink cdc 多表关联处理延迟很大

2020-11-15 Thread
select 
ri.sub_clazz_number, 
prcrs.rounds, 
count(*) as num 
from 
subclazz gs 
JOIN 
(SELECT gce.number, min( gce.extension_value ) AS grade FROM 
course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP BY 
gce.number) AS temp 
ON 
temp.number = gs.course_number AND temp.grade>30 
JOIN 
right_info ri
ON  
gs.number = ri.sub_clazz_number 
join
wide_subclazz ws  
on
ws.number = ri.sub_clazz_number 
join 
course gc 
on 
gc.number = ws.course_number and gc.course_category_id in (30,40)  
left join 
performance_regular_can_renewal_sign prcrs 
on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2) 
where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null 
and not exists (select 1 from internal_staff gis where gis.user_id = ri.user_id)
and not exists (select 1 from clazz_extension ce where ws.clazz_number = 
ce.number  
and ce.extension_type = 3 and ce.isdel = 0  
and ce.extension_value in (1,3,4,7,8,11))  
group by ri.sub_clazz_number, prcrs.rounds 
Sql代码是这样的。
瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。

> 在 2020年11月14日,下午5:53,Jark Wu  写道:
> 
> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
> 需要明确下,到底是什么节点慢了。
> 
> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
> 
>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>> 有没有比较好的优化方案能缓解这样的问题?




Flink sql 无法用!=

2020-11-15 Thread
我想在where条件下用不等号报错,难道flink sql不等号不是!=这个吗?
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.runtime.CalciteException: Bang equal '!=' is not allowed 
under the current SQL conformance level

Re: Flink cdc 多表关联处理延迟很大

2020-11-17 Thread
即使我将not 
exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
下面是截图,(我上传图片每次都看不了啥情况)
https://imgchr.com/i/DeqixU
https://imgchr.com/i/DeqP2T

> 在 2020年11月16日,上午10:29,Jark Wu  写道:
> 
> 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
> 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
> 
> Best,
> Jark
> 
> On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:
> 
>> select
>>ri.sub_clazz_number,
>>prcrs.rounds,
>>count(*) as num
>> from
>>subclazz gs
>> JOIN
>>(SELECT gce.number, min( gce.extension_value ) AS grade FROM
>> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP
>> BY gce.number) AS temp
>> ON
>>temp.number = gs.course_number AND temp.grade>30
>> JOIN
>>right_info ri
>> ON
>>gs.number = ri.sub_clazz_number
>> join
>>wide_subclazz ws
>> on
>>ws.number = ri.sub_clazz_number
>> join
>>course gc
>> on
>>gc.number = ws.course_number and gc.course_category_id in (30,40)
>> left join
>>performance_regular_can_renewal_sign prcrs
>> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
>> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>> and not exists (select 1 from internal_staff gis where gis.user_id =
>> ri.user_id)
>> and not exists (select 1 from clazz_extension ce where ws.clazz_number =
>> ce.number
>>and ce.extension_type = 3 and ce.isdel = 0
>>and ce.extension_value in (1,3,4,7,8,11))
>> group by ri.sub_clazz_number, prcrs.rounds
>> Sql代码是这样的。
>> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>> 
>>> 在 2020年11月14日,下午5:53,Jark Wu  写道:
>>> 
>>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
>>> 需要明确下,到底是什么节点慢了。
>>> 
>>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
>>> 
>>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>>>> 有没有比较好的优化方案能缓解这样的问题?
>> 
>> 
>> 




Re: Flink cdc 多表关联处理延迟很大

2020-11-17 Thread
我设置的全是cdcmysql表关联


> 在 2020年11月18日,下午1:07,hailongwang <18868816...@163.com> 写道:
> 
> 抱歉,描述错了。。
> 你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢
> 
> 在 2020-11-18 11:59:52,"hailongwang" <18868816...@163.com> 写道:
>> 我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
>> 你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢?
>> 如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。
>> 
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table
>> 在 2020-11-18 10:34:48,"Jark Wu"  写道:
>>> 另外,join 节点的并发可以再增加一些,提升 join 的处理性能。
>>> 
>>> On Wed, 18 Nov 2020 at 11:34, Jark Wu  wrote:
>>> 
>>>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
>>>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
>>>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
>>>> 
>>>> 解决办法文中也有提及:
>>>> 
>>>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
>>>> 
>>>> execution.checkpointing.interval: 10min   # checkpoint间隔时间
>>>> execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
>>>> 失败容忍次数
>>>> restart-strategy: fixed-delay  # 重试策略
>>>> restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
>>>> 
>>>> Best,
>>>> Jark
>>>> 
>>>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <18579099...@163.com> wrote:
>>>> 
>>>>> 即使我将not
>>>>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
>>>>> 下面是截图,(我上传图片每次都看不了啥情况)
>>>>> https://imgchr.com/i/DeqixU
>>>>> https://imgchr.com/i/DeqP2T
>>>>> 
>>>>>> 在 2020年11月16日,上午10:29,Jark Wu  写道:
>>>>>> 
>>>>>> 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
>>>>>> 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
>>>>>> 
>>>>>> Best,
>>>>>> Jark
>>>>>> 
>>>>>> On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:
>>>>>> 
>>>>>>> select
>>>>>>>   ri.sub_clazz_number,
>>>>>>>   prcrs.rounds,
>>>>>>>   count(*) as num
>>>>>>> from
>>>>>>>   subclazz gs
>>>>>>> JOIN
>>>>>>>   (SELECT gce.number, min( gce.extension_value ) AS grade FROM
>>>>>>> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
>>>>> GROUP
>>>>>>> BY gce.number) AS temp
>>>>>>> ON
>>>>>>>   temp.number = gs.course_number AND temp.grade>30
>>>>>>> JOIN
>>>>>>>   right_info ri
>>>>>>> ON
>>>>>>>   gs.number = ri.sub_clazz_number
>>>>>>> join
>>>>>>>   wide_subclazz ws
>>>>>>> on
>>>>>>>   ws.number = ri.sub_clazz_number
>>>>>>> join
>>>>>>>   course gc
>>>>>>> on
>>>>>>>   gc.number = ws.course_number and gc.course_category_id in (30,40)
>>>>>>> left join
>>>>>>>   performance_regular_can_renewal_sign prcrs
>>>>>>> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
>>>>>>> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>>>>>>> and not exists (select 1 from internal_staff gis where gis.user_id =
>>>>>>> ri.user_id)
>>>>>>> and not exists (select 1 from clazz_extension ce where ws.clazz_number
>>>>> =
>>>>>>> ce.number
>>>>>>>   and ce.extension_type = 3 and ce.isdel = 0
>>>>>>>   and ce.extension_value in (1,3,4,7,8,11))
>>>>>>> group by ri.sub_clazz_number, prcrs.rounds
>>>>>>> Sql代码是这样的。
>>>>>>> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>>>>>>> 
>>>>>>>> 在 2020年11月14日,下午5:53,Jark Wu  写道:
>>>>>>>> 
>>>>>>>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
>>>>>>>> 需要明确下,到底是什么节点慢了。
>>>>>>>> 
>>>>>>>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
>>>>>>>> 
>>>>>>>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>>>>>>>>> 有没有比较好的优化方案能缓解这样的问题?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>>> 




Flink cdc扫描过程报错Query execution was interrupted, maximum statement execution time exceeded

2020-11-18 Thread
我有一张大表有上亿条数据,但是在扫描过程中发生错误。提示超过最大执行时间,这个应该如何解决。

2020-11-18 17:07:43,658 INFO  io.debezium.connector.mysql.SnapshotReader
   [] - Step 7: - 10403 of 131508640 rows scanned from table 
'test.right_info' after 01:00:00.952
2020-11-18 17:07:44,131 INFO  io.debezium.connector.mysql.SnapshotReader
   [] - Step 7: - 10404 of 131508640 rows scanned from table 
'test.right_info' after 01:00:01.425
2020-11-18 17:07:44,601 INFO  io.debezium.connector.mysql.SnapshotReader
   [] - Step 7: committing transaction
2020-11-18 17:07:44,601 ERROR io.debezium.connector.mysql.SnapshotReader
   [] - Failed due to error: Aborting snapshot due to error when last 
running 'SELECT * FROM `test`.`right_info`': Query execution was interrupted, 
maximum statement execution time exceeded
org.apache.kafka.connect.errors.ConnectException: Query execution was 
interrupted, maximum statement execution time exceeded Error code: 3024; 
SQLSTATE: HY000.
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[?:1.8.0_91]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[?:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.sql.SQLException: Query execution was interrupted, maximum 
statement execution time exceeded
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
 ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at com.mysql.cj.jdbc.result.ResultSetImpl.next(ResultSetImpl.java:1739) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.SnapshotReader.lambda$execute$14(SnapshotReader.java:648)
 ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:473) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:641) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
... 3 more
2020-11-18 17:07:44,606 INFO  io.debezium.connector.common.BaseSourceTask   
   [] - Stopping down connector
2020-11-18 17:07:44,606 INFO  io.debezium.connector.mysql.MySqlConnectorTask
   [] - Stopping MySQL connector task
2020-11-18 17:07:44,606 INFO  io.debezium.connector.mysql.ChainedReader 
   [] - ChainedReader: Stopping the snapshot reader
2020-11-18 17:07:44,607 INFO  io.debezium.connector.mysql.SnapshotReader
   [] - Discarding 4363 unsent record(s) due to the connector shutting 
down
2020-11-18 17:07:44,607 INFO  io.debezium.connector.mysql.SnapshotReader
   [] - Discarding 0 unsent record(s) due to the connector shutting down
2020-11-18 17:07:44,608 INFO  io.debezium.connector.mysql.MySqlConnectorTask
   [] - Connector task finished all work and is now shutdown
2020-11-18 17:07:44,609 ERROR 
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction[] - Reporting 
error:
org.apache.kafka.connect.errors.ConnectException: Query execution was 
interrupted, maximum statement execution time exceeded Error code: 3024; 
SQLSTATE: HY000.
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[?:1.8.0_91]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[?:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.sql.SQLException: Query execution was interrupted, maximum 
statement execution time exceeded
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
 ~[flink-connector-mysql-cdc-1.1.0.

Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()

2020-12-17 Thread
flink版本:1.11.1
udaf函数代码来自于阿里云官网文档

以下是代码
public class TestSql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env);
//env.setParallelism(3);
tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class);

Properties configs = CommonUtils.getConfigs();
//注册clazz源表
FlinkUtils.registerMysqlTable2FlinkTable(
tableEnv,configs.getProperty("url"),
configs.getProperty("user.name"), 
configs.getProperty("password"),
“test", "clazz_lesson");

Table table = tableEnv.sqlQuery("select 
count_uadf(clazz_number),clazz_number from clazz_lesson group by clazz_number");
//Table table = tableEnv.sqlQuery("select 
number,collect(extension_value) from clazz_extension group by number ");
tableEnv.toRetractStream(table, Row.class).print();
env.execute();


}
}



public class CountUdaf extends AggregateFunction {
//定义存放count UDAF状态的accumulator的数据的结构。
public static class CountAccum {
public long total;
}

@Override
//初始化count UDAF的accumulator。
public CountAccum createAccumulator() {
CountAccum acc = new CountAccum();
acc.total = 0;
return acc;
}
@Override
//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
public Long getValue(CountAccum accumulator) {
return accumulator.total;
}


//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
public void accumulate(CountAccum accumulator, Long iValue) {
accumulator.total++;
}
public void merge(CountAccum accumulator, Iterable its) {
for (CountAccum other : its) {
accumulator.total += other.total;
}
}
}

以下是堆栈信息

-
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. From line 1, column 8 to line 1, column 31: No match found 
for function signature count_uadf()
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at 
com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 8 to line 1, column 31: No match found for function signature 
count_uadf()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.valida

Re: Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()

2020-12-17 Thread
问题我自己已经解决。

> 在 2020年12月17日,下午9:00,丁浩浩 <18579099...@163.com> 写道:
> 
> flink版本:1.11.1
> udaf函数代码来自于阿里云官网文档
> 
> 以下是代码
> public class TestSql {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env);
>//env.setParallelism(3);
>tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class);
> 
>Properties configs = CommonUtils.getConfigs();
>//注册clazz源表
>FlinkUtils.registerMysqlTable2FlinkTable(
>tableEnv,configs.getProperty("url"),
>configs.getProperty("user.name"), 
> configs.getProperty("password"),
>“test", "clazz_lesson");
> 
>Table table = tableEnv.sqlQuery("select 
> count_uadf(clazz_number),clazz_number from clazz_lesson group by 
> clazz_number");
>//Table table = tableEnv.sqlQuery("select 
> number,collect(extension_value) from clazz_extension group by number ");
>tableEnv.toRetractStream(table, Row.class).print();
>env.execute();
> 
> 
>}
> }
> 
> 
> 
> public class CountUdaf extends AggregateFunction {
>//定义存放count UDAF状态的accumulator的数据的结构。
>public static class CountAccum {
>public long total;
>}
> 
>@Override
>//初始化count UDAF的accumulator。
>public CountAccum createAccumulator() {
>CountAccum acc = new CountAccum();
>acc.total = 0;
>return acc;
>}
>@Override
>//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
>public Long getValue(CountAccum accumulator) {
>return accumulator.total;
>}
> 
> 
>//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
>public void accumulate(CountAccum accumulator, Long iValue) {
>accumulator.total++;
>}
>public void merge(CountAccum accumulator, Iterable its) {
>for (CountAccum other : its) {
>accumulator.total += other.total;
>}
>}
> }
> 
> 以下是堆栈信息
> 
> -
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 8 to line 1, column 31: No match 
> found for function signature count_uadf()
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
>   at 
> com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 8 to line 1, column 31: No match found for function signature 
> count_uadf()
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
>   at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
>   at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
>  

Flink-connector-jdbc源码问题疑惑

2021-05-12 Thread
flink版本:1.11

1.首先第一个问题是在1.11的官方文档connectors的jdbc页面上有这样一句话
Notice that scan.partition.lower-bound and scan.partition.upper-bound are just 
used to decide the partition stride, not for filtering the rows in table.
这与我观看的源码的结果是不一致的,scan.partition.lower-bound and scan.partition.upper-bound 
这两个参数会导致数据被过滤掉,今天我看了一下1.13的文档,这句话更改成了Notice that scan.partition.lower-bound and 
scan.partition.upper-bound are used to decide the partition stride and filter 
the rows in table. 这个是不是说明之前1.11文档的这句话是错的?


2。我在看inputFormat接口时,注释有这么一句话
IMPORTANT NOTE: Input formats must be written such that an instance can be 
opened again after it was closed. That
* is due to the fact that the input format is used for potentially multiple 
splits. After a split is done, the
* format's close function is invoked and, if another split is available, the 
open function is invoked afterwards for
* the next split.
这里有一个问题是,我设置
scan.partition.num: The number of partitions.
这个参数 
10。然后我的并行度设置为2,也就是每个slot里面会有一个Inputformat用于读取数据,当一个partition读完之后,会调用close方法,然后又会再次调用open方法,然后读取下一个分区。不知道这样理解是否正确?

然后假设我的分区是10,并行度是3,这个分区是怎么分配到slot的inputformat的呢?