回复: JdbcSink引发的IO过高

2023-07-25 文章 小昌同学
你好,感谢老师回复
`insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,我理解这两种情况的话,对于数据库来说,就是一次插入与两次插入的问题了吧,要是数据量大的话,感觉对性能还是有影响的
| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年7月26日 10:52 |
| 收件人 |  |
| 主题 | Re: JdbcSink引发的IO过高 |
Hi,

目前JdbcSink会为每个Sink创建PreparedStatement,当进行batch数据处理时,会先调用PreparedStatement的addBatch()函数将数据放入缓存,到达flush条件后调用executeBatch()函数批量发送数据到jdbc
server,这样会节省网络IO。

具体到数据库侧,我理解执行 `insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,对底层存储可能差别不大,因为插入的数据量不会减少,具体你可以观察一下

Best,
Shammon FY


On Tue, Jul 25, 2023 at 4:02 PM 小昌同学  wrote:

各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师:

我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式
是
insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);
或者是
insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6)
insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10)
如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO



|
errorStream.addSink(JdbcSink.sink(
"insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)",
(statement, result) -> {
statement.setString(1,result.getAction());
statement.setString(2,result.getServerIp());
statement.setString(3,result.getHandleSerialno());
statement.setString(4,result.getMd5Num());
statement.setString(5,result.getInsertTime());
statement.setString(6, result.getDateTime());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl("jdbc:mysql://111/data_ret_log?useSSL=false&useUnicode=false&failOverReadOnly=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnectForPools=true&serverTimezone=Asia/Shanghai&autoReconnect=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("222")
.build()
)).name("sink-error-mysql");
|
| |
小昌同学
|
|
ccc0606fight...@163.com
|


Re: JdbcSink引发的IO过高

2023-07-25 文章 Shammon FY
Hi,

目前JdbcSink会为每个Sink创建PreparedStatement,当进行batch数据处理时,会先调用PreparedStatement的addBatch()函数将数据放入缓存,到达flush条件后调用executeBatch()函数批量发送数据到jdbc
server,这样会节省网络IO。

具体到数据库侧,我理解执行 `insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,对底层存储可能差别不大,因为插入的数据量不会减少,具体你可以观察一下

Best,
Shammon FY


On Tue, Jul 25, 2023 at 4:02 PM 小昌同学  wrote:

> 各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师:
>
> 我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式
> 是
> insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values
> (1,2,3,4,5,6),(1,2,3,4,9,10);
> 或者是
> insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6)
> insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10)
> 如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO
>
>
>
> |
> errorStream.addSink(JdbcSink.sink(
> "insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)",
> (statement, result) -> {
> statement.setString(1,result.getAction());
> statement.setString(2,result.getServerIp());
> statement.setString(3,result.getHandleSerialno());
> statement.setString(4,result.getMd5Num());
> statement.setString(5,result.getInsertTime());
> statement.setString(6, result.getDateTime());
> },
> JdbcExecutionOptions.builder()
> .withBatchSize(1000)
> .withBatchIntervalMs(200)
> .withMaxRetries(5)
> .build(),
> new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>
> .withUrl("jdbc:mysql://111/data_ret_log?useSSL=false&useUnicode=false&failOverReadOnly=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnectForPools=true&serverTimezone=Asia/Shanghai&autoReconnect=true")
> .withDriverName("com.mysql.jdbc.Driver")
> .withUsername("111")
> .withPassword("222")
> .build()
> )).name("sink-error-mysql");
> |
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: 关于DataStream API计算批数据的聚合值

2023-07-25 文章 weijie guo
你好:

Batch 模式下的 reduce 操作默认应该就是只输出最后一条数据(per-key)的。Agg 的话可能有点麻烦,可以使用
GlobalWindow + 自定义 Trigger 来 Workaround.

Best regards,

Weijie


Liu Join  于2023年7月26日周三 09:10写道:

> 例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值
>


Re: 关于DataStream API计算批数据的聚合值

2023-07-25 文章 Shammon FY
Hi,

跟使用普通流式作业的DataStream用法一样,只需要在RuntimeMode里使用Batch模式,Flink在Batch模式下会只输出最后的结果,而不会输出中间结果。具体可以参考Flink里的WordCount例子
[1]

[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java


On Wed, Jul 26, 2023 at 9:10 AM Liu Join  wrote:

> 例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值
>


关于DataStream API计算批数据的聚合值

2023-07-25 文章 Liu Join
例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值


JdbcSink引发的IO过高

2023-07-25 文章 小昌同学
各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师:
我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式
是
insert  into error_md5_info (action, serverIp, 
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10);
或者是
insert  into error_md5_info (action, serverIp, 
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6)
insert  into error_md5_info (action, serverIp, 
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10)
如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO



|
errorStream.addSink(JdbcSink.sink(
"insert  into error_md5_info (action, serverIp, 
handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)",
(statement, result) -> {
statement.setString(1,result.getAction());
statement.setString(2,result.getServerIp());
statement.setString(3,result.getHandleSerialno());
statement.setString(4,result.getMd5Num());
statement.setString(5,result.getInsertTime());
statement.setString(6, result.getDateTime());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl("jdbc:mysql://111/data_ret_log?useSSL=false&useUnicode=false&failOverReadOnly=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnectForPools=true&serverTimezone=Asia/Shanghai&autoReconnect=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("222")
.build()
)).name("sink-error-mysql");
|
| |
小昌同学
|
|
ccc0606fight...@163.com
|