你说的是upsert-kafka的这两个参数吗? sink.buffer-flush.max-rows sink.buffer-flush.interval 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
在 2021-12-25 22:54:19,"郭伟权" <gwqloveli...@gmail.com> 写道: >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 > >casel.chen <casel_c...@126.com> 于2021年12月23日周四 08:15写道: > >> flink sql中aggregate without >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? >> >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新): >> >> orderid. category dt amt >> >> 订单id 商品类型 购买时间(yyyyMMddHH) 购买金额 >> >> >> >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 >> >> >> >> INSERT INTO mysql_sink_table >> >> SELECT category, dt, LAST_VALUE(total) >> >> OVER ( >> >> PARTITION BY category >> >> ORDER BY PROCTIME() >> >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW >> >> ) AS var1 >> >> FROM ( >> >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt >> >> );