Hi xiexinyuan341,
我理解这边有 2 个问题:
1. “偶尔会出现连接超时”,这个的话有具体的堆栈吗。如果是因为长时间没有数据的查询导致 connection invalid
话,这个在1.12,1.11.3 中应该是解决了[1].
2. 你的 source 是什么组件呢?程序抛异常的话,自动重启或者手动重启话,如果是 “最少一次” 语义的话,应该还是会 join 上 sink
到下游的;或者可以开启 checkpoint,保证 flink 内部的 “精确一次”。
[1] https://issues.apache.org/jira/browse/FLINK-16
Hi,我使用的1.12也出现了这个问题,请问你怎么解决的呢?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
souce是kafka,使用JdbcRowDataLookupFunction作为维表.异常信息是这样的,看了下日志,这种异常基本上每10多分钟就会有一次.
2020-11-12 01:00:09.028 ERROR JdbcRowDataLookupFunction.java:170 JDBC
executeBatch error, retry times = 1
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link
failure
The last packet successfully
source组件使用的是 kafka connector , 使用的JdbcRowDataLookupFunction 作为维表查询.
报错信息是下面这种:
2020-11-12 00:10:00.153 ERROR JdbcRowDataLookupFunction.java:170 JDBC
executeBatch error, retry times = 1
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link
failure
The last packet successfull
这个应该就是长时间没有数据传输导致的 链接不可用,其中可能是:
1、kakfa 的数据和稀疏,数据达到的时间间隔大于 “wait_timeout“
2、一直没有 join 上 mysql 的数据导致的。
可以设置下 数据库的 wait_timeout 看下
PS,如果这个场景,自动恢复应该是没问题的,但是需要确定下根本原因,看是正常的还是异常的,怎么去避免。
最好设置下 checkpoint,这个 kafka 的 offset 是在checkpoint 成功的时候才 ack的,这样就不会导致
这条数据被自动ack而丢弃的。
如果开启 checkpoint 的话,下游支持 upsert