你好,感谢老师回复
`insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,我理解这两种情况的话,对于数据库来说,就是一次插入与两次插入的问题了吧,要是数据量大的话,感觉对性能还是有影响的
| |
小昌同学
|
|
ccc0606fight...@163.com
|
---- 回复的原邮件 ----
| 发件人 | Shammon FY<zjur...@gmail.com> |
| 发送日期 | 2023年7月26日 10:52 |
| 收件人 | <user-zh@flink.apache.org> |
| 主题 | Re: JdbcSink引发的IO过高 |
Hi,

目前JdbcSink会为每个Sink创建PreparedStatement,当进行batch数据处理时,会先调用PreparedStatement的addBatch()函数将数据放入缓存,到达flush条件后调用executeBatch()函数批量发送数据到jdbc
server,这样会节省网络IO。

具体到数据库侧,我理解执行 `insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,对底层存储可能差别不大,因为插入的数据量不会减少,具体你可以观察一下

Best,
Shammon FY


On Tue, Jul 25, 2023 at 4:02 PM 小昌同学 <ccc0606fight...@163.com> wrote:

各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师:

我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式
是
insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);
或者是
insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6)
insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10)
如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO



|
errorStream.addSink(JdbcSink.sink(
"insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)",
(statement, result) -> {
statement.setString(1,result.getAction());
statement.setString(2,result.getServerIp());
statement.setString(3,result.getHandleSerialno());
statement.setString(4,result.getMd5Num());
statement.setString(5,result.getInsertTime());
statement.setString(6, result.getDateTime());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl("jdbc:mysql://111/data_ret_log?useSSL=false&useUnicode=false&failOverReadOnly=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnectForPools=true&serverTimezone=Asia/Shanghai&autoReconnect=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("222")
.build()
)).name("sink-error-mysql");
|
| |
小昌同学
|
|
ccc0606fight...@163.com
|

Reply via email to