Re: Re: Flink SQL 如何优化以及处理反压
可以看一下反压算子是否出现在同一台机器(排除单点故障)。比如使用了 rocksdb + hdd 盘;单机负载过高;磁盘打满等。 如果不是单点故障,可以打 jstack 查看对应的线程具体在执行什么样的操作,再进行相应的逻辑优化。 On Tue, Jan 31, 2023 at 6:01 PM lxk wrote: > 现在从web ui上看,瓶颈主要在于group by 聚合函数之后去重这个逻辑。 > 而且SQL这个并行度是全局设置的,没法针对某一个特定的算子设置并行度,并行度多了之后,资源又感觉有点吃紧。 > > > > > > > > > > > > > > > > > > 在 2023-01-31 17:45:15,"weijie guo" 写道: > >最好先找到导致下游处理过慢的瓶颈算子,适当扩大一下并发。如果还不行,看下jstack的情况,可能需要调整逻辑。 > > > >Best regards, > > > >Weijie > > > > > >ssmq <374060...@qq.com.invalid> 于2023年1月31日周二 17:22写道: > > > >> 你可以测试不写入clickhouse是否还存在反压,如果不是因为写入瓶颈的话就从你的处理逻辑优化了 > >> > >> > >> 发件人: lxk > >> 发送时间: 2023年1月31日 15:16 > >> 收件人: user-zh@flink.apache.org > >> 主题: Flink SQL 如何优化以及处理反压 > >> > >> Flink版本:1.16.0 > >> 目前在使用Flink SQL进行多流关联,并写入Clickhouse中 > >> 具体代码如下: > >> select \ > >> header.id as id, \ > >> LAST_VALUE(header.order_status), \ > >> LAST_VALUE(header.customer_id), \ > >> LAST_VALUE(header.shop_id), \ > >> LAST_VALUE(header.parent_order_id), \ > >> LAST_VALUE(header.order_at), \ > >> LAST_VALUE(header.pay_at), \ > >> LAST_VALUE(header.channel_id), \ > >> LAST_VALUE(header.root_order_id), \ > >> LAST_VALUE(header.last_updated_at), \ > >> item.id as item_id, \ > >> LAST_VALUE(item.order_id) as order_id, \ > >> LAST_VALUE(item.row_num), \ > >> LAST_VALUE(item.goods_id), \ > >> LAST_VALUE(item.s_sku_code), \ > >> LAST_VALUE(item.qty), \ > >> LAST_VALUE(item.p_paid_sub_amt), \ > >> LAST_VALUE(item.p_sp_sub_amt), \ > >> LAST_VALUE(item.bom_type), \ > >> LAST_VALUE(item.last_updated_at) as item_last_updated_at, \ > >> LAST_VALUE(item.display_qty), \ > >> LAST_VALUE(delivery.del_type), \ > >> LAST_VALUE(delivery.time_slot_type), \ > >> LAST_VALUE(delivery.time_slot_date), \ > >> LAST_VALUE(delivery.time_slot_time_from), \ > >> LAST_VALUE(delivery.time_slot_time_to), \ > >> LAST_VALUE(delivery.sku_delivery_type), \ > >> LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \ > >> LAST_VALUE(promotion.id) as promo_id, \ > >> LAST_VALUE(promotion.order_item_id), \ > >> LAST_VALUE(promotion.p_promo_amt), \ > >> LAST_VALUE(promotion.promotion_category), \ > >> LAST_VALUE(promotion.promo_type), \ > >> LAST_VALUE(promotion.promo_sub_type), \ > >> LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \ > >> LAST_VALUE(promotion.promotion_cost) \ > >> from \ > >> item \ > >> join \ > >> header \ > >> on item.order_id = header.id \ > >> left join \ > >> delivery \ > >> on item.order_id = delivery.order_id \ > >> left join \ > >> promotion \ > >> on item.id =promotion.order_item_id \ > >> group by header.id,item.id > >> 在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉: > >> https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg > >> > >> 参考了京东的一篇文章 > >> > https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc > >> ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。 > >> > >> conf.setString("table.exec.mini-batch.enabled", "true"); > >> conf.setString("table.exec.mini-batch.allow-latency", "15 s"); > >> conf.setString("table.exec.mini-batch.size", "5000"); > >> conf.setString("table.exec.state.ttl", "86400 s"); > >> conf.setString("table.exec.disabled-operators", "NestedLoopJoin"); > >> conf.setString("table.optimizer.join.broadcast-threshold", "-1"); > >> conf.setString("table.optimizer.multiple-input-enabled", "true"); > >> conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED"); > >> conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8"); > >> 想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段? > >> > >> > >> > >> >
Re:Re: Flink SQL 如何优化以及处理反压
现在从web ui上看,瓶颈主要在于group by 聚合函数之后去重这个逻辑。 而且SQL这个并行度是全局设置的,没法针对某一个特定的算子设置并行度,并行度多了之后,资源又感觉有点吃紧。 在 2023-01-31 17:45:15,"weijie guo" 写道: >最好先找到导致下游处理过慢的瓶颈算子,适当扩大一下并发。如果还不行,看下jstack的情况,可能需要调整逻辑。 > >Best regards, > >Weijie > > >ssmq <374060...@qq.com.invalid> 于2023年1月31日周二 17:22写道: > >> 你可以测试不写入clickhouse是否还存在反压,如果不是因为写入瓶颈的话就从你的处理逻辑优化了 >> >> >> 发件人: lxk >> 发送时间: 2023年1月31日 15:16 >> 收件人: user-zh@flink.apache.org >> 主题: Flink SQL 如何优化以及处理反压 >> >> Flink版本:1.16.0 >> 目前在使用Flink SQL进行多流关联,并写入Clickhouse中 >> 具体代码如下: >> select \ >> header.id as id, \ >> LAST_VALUE(header.order_status), \ >> LAST_VALUE(header.customer_id), \ >> LAST_VALUE(header.shop_id), \ >> LAST_VALUE(header.parent_order_id), \ >> LAST_VALUE(header.order_at), \ >> LAST_VALUE(header.pay_at), \ >> LAST_VALUE(header.channel_id), \ >> LAST_VALUE(header.root_order_id), \ >> LAST_VALUE(header.last_updated_at), \ >> item.id as item_id, \ >> LAST_VALUE(item.order_id) as order_id, \ >> LAST_VALUE(item.row_num), \ >> LAST_VALUE(item.goods_id), \ >> LAST_VALUE(item.s_sku_code), \ >> LAST_VALUE(item.qty), \ >> LAST_VALUE(item.p_paid_sub_amt), \ >> LAST_VALUE(item.p_sp_sub_amt), \ >> LAST_VALUE(item.bom_type), \ >> LAST_VALUE(item.last_updated_at) as item_last_updated_at, \ >> LAST_VALUE(item.display_qty), \ >> LAST_VALUE(delivery.del_type), \ >> LAST_VALUE(delivery.time_slot_type), \ >> LAST_VALUE(delivery.time_slot_date), \ >> LAST_VALUE(delivery.time_slot_time_from), \ >> LAST_VALUE(delivery.time_slot_time_to), \ >> LAST_VALUE(delivery.sku_delivery_type), \ >> LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \ >> LAST_VALUE(promotion.id) as promo_id, \ >> LAST_VALUE(promotion.order_item_id), \ >> LAST_VALUE(promotion.p_promo_amt), \ >> LAST_VALUE(promotion.promotion_category), \ >> LAST_VALUE(promotion.promo_type), \ >> LAST_VALUE(promotion.promo_sub_type), \ >> LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \ >> LAST_VALUE(promotion.promotion_cost) \ >> from \ >> item \ >> join \ >> header \ >> on item.order_id = header.id \ >> left join \ >> delivery \ >> on item.order_id = delivery.order_id \ >> left join \ >> promotion \ >> on item.id =promotion.order_item_id \ >> group by header.id,item.id >> 在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉: >> https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg >> >> 参考了京东的一篇文章 >> https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc >> ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。 >> >> conf.setString("table.exec.mini-batch.enabled", "true"); >> conf.setString("table.exec.mini-batch.allow-latency", "15 s"); >> conf.setString("table.exec.mini-batch.size", "5000"); >> conf.setString("table.exec.state.ttl", "86400 s"); >> conf.setString("table.exec.disabled-operators", "NestedLoopJoin"); >> conf.setString("table.optimizer.join.broadcast-threshold", "-1"); >> conf.setString("table.optimizer.multiple-input-enabled", "true"); >> conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED"); >> conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8"); >> 想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段? >> >> >> >>
Re: Flink SQL 如何优化以及处理反压
最好先找到导致下游处理过慢的瓶颈算子,适当扩大一下并发。如果还不行,看下jstack的情况,可能需要调整逻辑。 Best regards, Weijie ssmq <374060...@qq.com.invalid> 于2023年1月31日周二 17:22写道: > 你可以测试不写入clickhouse是否还存在反压,如果不是因为写入瓶颈的话就从你的处理逻辑优化了 > > > 发件人: lxk > 发送时间: 2023年1月31日 15:16 > 收件人: user-zh@flink.apache.org > 主题: Flink SQL 如何优化以及处理反压 > > Flink版本:1.16.0 > 目前在使用Flink SQL进行多流关联,并写入Clickhouse中 > 具体代码如下: > select \ > header.id as id, \ > LAST_VALUE(header.order_status), \ > LAST_VALUE(header.customer_id), \ > LAST_VALUE(header.shop_id), \ > LAST_VALUE(header.parent_order_id), \ > LAST_VALUE(header.order_at), \ > LAST_VALUE(header.pay_at), \ > LAST_VALUE(header.channel_id), \ > LAST_VALUE(header.root_order_id), \ > LAST_VALUE(header.last_updated_at), \ > item.id as item_id, \ > LAST_VALUE(item.order_id) as order_id, \ > LAST_VALUE(item.row_num), \ > LAST_VALUE(item.goods_id), \ > LAST_VALUE(item.s_sku_code), \ > LAST_VALUE(item.qty), \ > LAST_VALUE(item.p_paid_sub_amt), \ > LAST_VALUE(item.p_sp_sub_amt), \ > LAST_VALUE(item.bom_type), \ > LAST_VALUE(item.last_updated_at) as item_last_updated_at, \ > LAST_VALUE(item.display_qty), \ > LAST_VALUE(delivery.del_type), \ > LAST_VALUE(delivery.time_slot_type), \ > LAST_VALUE(delivery.time_slot_date), \ > LAST_VALUE(delivery.time_slot_time_from), \ > LAST_VALUE(delivery.time_slot_time_to), \ > LAST_VALUE(delivery.sku_delivery_type), \ > LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \ > LAST_VALUE(promotion.id) as promo_id, \ > LAST_VALUE(promotion.order_item_id), \ > LAST_VALUE(promotion.p_promo_amt), \ > LAST_VALUE(promotion.promotion_category), \ > LAST_VALUE(promotion.promo_type), \ > LAST_VALUE(promotion.promo_sub_type), \ > LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \ > LAST_VALUE(promotion.promotion_cost) \ > from \ > item \ > join \ > header \ > on item.order_id = header.id \ > left join \ > delivery \ > on item.order_id = delivery.order_id \ > left join \ > promotion \ > on item.id =promotion.order_item_id \ > group by header.id,item.id > 在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉: > https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg > > 参考了京东的一篇文章 > https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc > ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。 > > conf.setString("table.exec.mini-batch.enabled", "true"); > conf.setString("table.exec.mini-batch.allow-latency", "15 s"); > conf.setString("table.exec.mini-batch.size", "5000"); > conf.setString("table.exec.state.ttl", "86400 s"); > conf.setString("table.exec.disabled-operators", "NestedLoopJoin"); > conf.setString("table.optimizer.join.broadcast-threshold", "-1"); > conf.setString("table.optimizer.multiple-input-enabled", "true"); > conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED"); > conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8"); > 想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段? > > > >
回复: Flink SQL 如何优化以及处理反压
你可以测试不写入clickhouse是否还存在反压,如果不是因为写入瓶颈的话就从你的处理逻辑优化了 发件人: lxk 发送时间: 2023年1月31日 15:16 收件人: user-zh@flink.apache.org 主题: Flink SQL 如何优化以及处理反压 Flink版本:1.16.0 目前在使用Flink SQL进行多流关联,并写入Clickhouse中 具体代码如下: select \ header.id as id, \ LAST_VALUE(header.order_status), \ LAST_VALUE(header.customer_id), \ LAST_VALUE(header.shop_id), \ LAST_VALUE(header.parent_order_id), \ LAST_VALUE(header.order_at), \ LAST_VALUE(header.pay_at), \ LAST_VALUE(header.channel_id), \ LAST_VALUE(header.root_order_id), \ LAST_VALUE(header.last_updated_at), \ item.id as item_id, \ LAST_VALUE(item.order_id) as order_id, \ LAST_VALUE(item.row_num), \ LAST_VALUE(item.goods_id), \ LAST_VALUE(item.s_sku_code), \ LAST_VALUE(item.qty), \ LAST_VALUE(item.p_paid_sub_amt), \ LAST_VALUE(item.p_sp_sub_amt), \ LAST_VALUE(item.bom_type), \ LAST_VALUE(item.last_updated_at) as item_last_updated_at, \ LAST_VALUE(item.display_qty), \ LAST_VALUE(delivery.del_type), \ LAST_VALUE(delivery.time_slot_type), \ LAST_VALUE(delivery.time_slot_date), \ LAST_VALUE(delivery.time_slot_time_from), \ LAST_VALUE(delivery.time_slot_time_to), \ LAST_VALUE(delivery.sku_delivery_type), \ LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \ LAST_VALUE(promotion.id) as promo_id, \ LAST_VALUE(promotion.order_item_id), \ LAST_VALUE(promotion.p_promo_amt), \ LAST_VALUE(promotion.promotion_category), \ LAST_VALUE(promotion.promo_type), \ LAST_VALUE(promotion.promo_sub_type), \ LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \ LAST_VALUE(promotion.promotion_cost) \ from \ item \ join \ header \ on item.order_id = header.id \ left join \ delivery \ on item.order_id = delivery.order_id \ left join \ promotion \ on item.id =promotion.order_item_id \ group by header.id,item.id 在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉: https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg 参考了京东的一篇文章https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。 conf.setString("table.exec.mini-batch.enabled", "true"); conf.setString("table.exec.mini-batch.allow-latency", "15 s"); conf.setString("table.exec.mini-batch.size", "5000"); conf.setString("table.exec.state.ttl", "86400 s"); conf.setString("table.exec.disabled-operators", "NestedLoopJoin"); conf.setString("table.optimizer.join.broadcast-threshold", "-1"); conf.setString("table.optimizer.multiple-input-enabled", "true"); conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED"); conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8"); 想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段?
Flink SQL 如何优化以及处理反压
Flink版本:1.16.0 目前在使用Flink SQL进行多流关联,并写入Clickhouse中 具体代码如下: select \ header.id as id, \ LAST_VALUE(header.order_status), \ LAST_VALUE(header.customer_id), \ LAST_VALUE(header.shop_id), \ LAST_VALUE(header.parent_order_id), \ LAST_VALUE(header.order_at), \ LAST_VALUE(header.pay_at), \ LAST_VALUE(header.channel_id), \ LAST_VALUE(header.root_order_id), \ LAST_VALUE(header.last_updated_at), \ item.id as item_id, \ LAST_VALUE(item.order_id) as order_id, \ LAST_VALUE(item.row_num), \ LAST_VALUE(item.goods_id), \ LAST_VALUE(item.s_sku_code), \ LAST_VALUE(item.qty), \ LAST_VALUE(item.p_paid_sub_amt), \ LAST_VALUE(item.p_sp_sub_amt), \ LAST_VALUE(item.bom_type), \ LAST_VALUE(item.last_updated_at) as item_last_updated_at, \ LAST_VALUE(item.display_qty), \ LAST_VALUE(delivery.del_type), \ LAST_VALUE(delivery.time_slot_type), \ LAST_VALUE(delivery.time_slot_date), \ LAST_VALUE(delivery.time_slot_time_from), \ LAST_VALUE(delivery.time_slot_time_to), \ LAST_VALUE(delivery.sku_delivery_type), \ LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \ LAST_VALUE(promotion.id) as promo_id, \ LAST_VALUE(promotion.order_item_id), \ LAST_VALUE(promotion.p_promo_amt), \ LAST_VALUE(promotion.promotion_category), \ LAST_VALUE(promotion.promo_type), \ LAST_VALUE(promotion.promo_sub_type), \ LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \ LAST_VALUE(promotion.promotion_cost) \ from \ item \ join \ header \ on item.order_id = header.id \ left join \ delivery \ on item.order_id = delivery.order_id \ left join \ promotion \ on item.id =promotion.order_item_id \ group by header.id,item.id 在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉: https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg 参考了京东的一篇文章https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。 conf.setString("table.exec.mini-batch.enabled", "true"); conf.setString("table.exec.mini-batch.allow-latency", "15 s"); conf.setString("table.exec.mini-batch.size", "5000"); conf.setString("table.exec.state.ttl", "86400 s"); conf.setString("table.exec.disabled-operators", "NestedLoopJoin"); conf.setString("table.optimizer.join.broadcast-threshold", "-1"); conf.setString("table.optimizer.multiple-input-enabled", "true"); conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED"); conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8"); 想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段?