mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。

















在 2022-01-06 20:43:00,"Benchao Li" <libenc...@apache.org> 写道:
>这个问题可以用mini-batch[1]来解决呀
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation
>
>casel.chen <casel_c...@126.com> 于2021年12月26日周日 18:01写道:
>
>> 你说的是upsert-kafka的这两个参数吗?
>>
>> sink.buffer-flush.max-rows
>> sink.buffer-flush.interval
>> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink
>> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2021-12-25 22:54:19,"郭伟权" <gwqloveli...@gmail.com> 写道:
>>
>> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
>> >
>> >casel.chen <casel_c...@126.com> 于2021年12月23日周四 08:15写道:
>> >
>> >> flink sql中aggregate without
>> >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
>> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
>> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>> >>
>> >>
>> >> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>> >>
>> >> orderid.   category    dt                                          amt
>> >>
>> >> 订单id     商品类型   购买时间(yyyyMMddHH)      购买金额
>> >>
>> >>
>> >>
>> >>
>> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>> >>
>> >>
>> >>
>> >> INSERT INTO mysql_sink_table
>> >>
>> >> SELECT category, dt, LAST_VALUE(total)
>> >>
>> >> OVER (
>> >>
>> >>   PARTITION BY category
>> >>
>> >>   ORDER BY PROCTIME()
>> >>
>> >>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>> >>
>> >> ) AS var1
>> >>
>> >> FROM (
>> >>
>> >>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>> >>
>> >> );
>>
>
>
>-- 
>
>Best,
>Benchao Li

回复