Flink版本:1.16.0
目前在使用Flink SQL进行多流关联,并写入Clickhouse中
具体代码如下:
select \
header.id as id, \
LAST_VALUE(header.order_status), \
LAST_VALUE(header.customer_id), \
LAST_VALUE(header.shop_id), \
LAST_VALUE(header.parent_order_id), \
LAST_VALUE(header.order_at), \
LAST_VALUE(header.pay_at), \
LAST_VALUE(header.channel_id), \
LAST_VALUE(header.root_order_id), \
LAST_VALUE(header.last_updated_at), \
item.id as item_id, \
LAST_VALUE(item.order_id) as order_id, \
LAST_VALUE(item.row_num), \
LAST_VALUE(item.goods_id), \
LAST_VALUE(item.s_sku_code), \
LAST_VALUE(item.qty), \
LAST_VALUE(item.p_paid_sub_amt), \
LAST_VALUE(item.p_sp_sub_amt), \
LAST_VALUE(item.bom_type), \
LAST_VALUE(item.last_updated_at) as item_last_updated_at, \
LAST_VALUE(item.display_qty), \
LAST_VALUE(delivery.del_type), \
LAST_VALUE(delivery.time_slot_type), \
LAST_VALUE(delivery.time_slot_date), \
LAST_VALUE(delivery.time_slot_time_from), \
LAST_VALUE(delivery.time_slot_time_to), \
LAST_VALUE(delivery.sku_delivery_type), \
LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \
LAST_VALUE(promotion.id) as promo_id, \
LAST_VALUE(promotion.order_item_id), \
LAST_VALUE(promotion.p_promo_amt), \
LAST_VALUE(promotion.promotion_category), \
LAST_VALUE(promotion.promo_type), \
LAST_VALUE(promotion.promo_sub_type), \
LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \
LAST_VALUE(promotion.promotion_cost) \
from \
  item \
  join \
  header  \
  on item.order_id = header.id \
  left join \
  delivery \
  on item.order_id = delivery.order_id \
  left join \
  promotion \
  on item.id =promotion.order_item_id \
  group by header.id,item.id
在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉:
https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg
参考了京东的一篇文章https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc
 ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。


conf.setString("table.exec.mini-batch.enabled", "true");
conf.setString("table.exec.mini-batch.allow-latency", "15 s");
conf.setString("table.exec.mini-batch.size", "5000");
conf.setString("table.exec.state.ttl", "86400 s");
conf.setString("table.exec.disabled-operators", "NestedLoopJoin");
conf.setString("table.optimizer.join.broadcast-threshold", "-1");
conf.setString("table.optimizer.multiple-input-enabled", "true");
conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED");
conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8");
想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段?



Reply via email to