Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

2020-06-18 文章 Leonard Xu
Hello

1.7.2是比较老的版本了, 可以考虑下升级新的版本,新的版本都支持你所需的功能的。

1.10.0 && 1.10.1 文档[1],对应的两个参数:

  'connector.write.flush.max-rows' = '5000', -- optional, flush max size 
(includes all append, upsert and delete records), 
 -- 
over this number of records, will flush data. The default value is "5000".
  'connector.write.flush.interval' = '2s', --optional, flush interval 
mills, over this time, asynchronous threads will flush data.
如果使用1.10版本,推荐使用1.10.1版本,1.10.1在1.10.0的基础上修复了一些bug。

社区即将发布的1.11.0 文档[2], 对应的两个参数:
sink.buffer-flush.max-rows  -- The max size of buffered records before 
flush. Can be set to zero to disable it.
sink.buffer-flush.interval  -- The flush interval mills, over this time, 
asynchronous threads will flush data. 
-- Can be set to '0' to disable 
it. Note, 'sink.buffer-flush.max-rows' can be set to '0' 
-- with the flush interval set 
allowing for complete async processing of buffered actions.

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector
 

 
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#connector-options
 

 


> 在 2020年6月18日,17:04,nicygan  写道:
> 
> dear all:
> 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
> 
> 
> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
>.setDrivername("com.mysql.jdbc.Driver")
>.setDBUrl("jdbc:mysql://localhost:3306/flink")
>.setUsername("root")
>.setPassword("123456")
>.setQuery(sql2)
>.setParameterTypes(types)
> .setBatchSize(1000)
>   .build();
> 
> === 问题 
> 如果上游数据来源时间是:
> 10:00 -> 900条
> 10:10 -> 120条
> 11:50 -> 1100条
> 15:00 -> 900条
> 
> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
> 10:10 -> 写入1000条,剩20条下次写入
> 11:50 -> 写入1000条,剩30条下次写入
> 15:00 -> 写入1000条,剩10条下次写入
> 
> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
> 10:10 -> 写入1000条,剩20条下次写入
> 10:30 -> 写入20条
> 11:50 -> 写入1000条,剩10条下次写入
> 12:10 -> 写入10条
> 15:20 -> 写入900条
> 
> thanks



Re:Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

2020-06-18 文章 nicygan



请问timeout值是多少?在哪里可设置?



在 2020-06-18 17:43:31,"Benchao Li"  写道:
>我理解现在就是你想要的效果。
>batch-size和timeout两个条件是达到一个就会flush的。
>
>nicygan  于2020年6月18日周四 下午5:05写道:
>
>> dear all:
>>  我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
>>
>>
>> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
>> .setDrivername("com.mysql.jdbc.Driver")
>> .setDBUrl("jdbc:mysql://localhost:3306/flink")
>> .setUsername("root")
>> .setPassword("123456")
>> .setQuery(sql2)
>> .setParameterTypes(types)
>>  .setBatchSize(1000)
>>.build();
>>
>> === 问题 
>> 如果上游数据来源时间是:
>> 10:00 -> 900条
>> 10:10 -> 120条
>> 11:50 -> 1100条
>> 15:00 -> 900条
>>
>> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
>> 10:10 -> 写入1000条,剩20条下次写入
>> 11:50 -> 写入1000条,剩30条下次写入
>> 15:00 -> 写入1000条,剩10条下次写入
>>
>> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
>> 10:10 -> 写入1000条,剩20条下次写入
>> 10:30 -> 写入20条
>> 11:50 -> 写入1000条,剩10条下次写入
>> 12:10 -> 写入10条
>> 15:20 -> 写入900条
>>
>> thanks
>>
>
>
>-- 
>
>Best,
>Benchao Li


Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

2020-06-18 文章 Benchao Li
我理解现在就是你想要的效果。
batch-size和timeout两个条件是达到一个就会flush的。

nicygan  于2020年6月18日周四 下午5:05写道:

> dear all:
>  我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
>
>
> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
> .setDrivername("com.mysql.jdbc.Driver")
> .setDBUrl("jdbc:mysql://localhost:3306/flink")
> .setUsername("root")
> .setPassword("123456")
> .setQuery(sql2)
> .setParameterTypes(types)
>  .setBatchSize(1000)
>.build();
>
> === 问题 
> 如果上游数据来源时间是:
> 10:00 -> 900条
> 10:10 -> 120条
> 11:50 -> 1100条
> 15:00 -> 900条
>
> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
> 10:10 -> 写入1000条,剩20条下次写入
> 11:50 -> 写入1000条,剩30条下次写入
> 15:00 -> 写入1000条,剩10条下次写入
>
> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
> 10:10 -> 写入1000条,剩20条下次写入
> 10:30 -> 写入20条
> 11:50 -> 写入1000条,剩10条下次写入
> 12:10 -> 写入10条
> 15:20 -> 写入900条
>
> thanks
>


-- 

Best,
Benchao Li


flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

2020-06-18 文章 nicygan
dear all:
 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。


JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/flink")
.setUsername("root")
.setPassword("123456")
.setQuery(sql2)
.setParameterTypes(types)
 .setBatchSize(1000)
   .build();

=== 问题 
如果上游数据来源时间是:
10:00 -> 900条
10:10 -> 120条
11:50 -> 1100条
15:00 -> 900条

JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
10:10 -> 写入1000条,剩20条下次写入
11:50 -> 写入1000条,剩30条下次写入
15:00 -> 写入1000条,剩10条下次写入

我想要达到等待20分种,不满足batchSize也写入,能否实现?
10:10 -> 写入1000条,剩20条下次写入
10:30 -> 写入20条
11:50 -> 写入1000条,剩10条下次写入
12:10 -> 写入10条
15:20 -> 写入900条

thanks