你说的是upsert-kafka的这两个参数吗?

sink.buffer-flush.max-rows
sink.buffer-flush.interval
确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink 
kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。

















在 2021-12-25 22:54:19,"郭伟权" <gwqloveli...@gmail.com> 写道:
>结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
>
>casel.chen <casel_c...@126.com> 于2021年12月23日周四 08:15写道:
>
>> flink sql中aggregate without
>> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
>> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
>> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>>
>>
>> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>>
>> orderid.   category    dt                                          amt
>>
>> 订单id     商品类型   购买时间(yyyyMMddHH)      购买金额
>>
>>
>>
>>
>> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>>
>>
>>
>> INSERT INTO mysql_sink_table
>>
>> SELECT category, dt, LAST_VALUE(total)
>>
>> OVER (
>>
>>   PARTITION BY category
>>
>>   ORDER BY PROCTIME()
>>
>>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>>
>> ) AS var1
>>
>> FROM (
>>
>>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>>
>> );

回复