Re: Re: flink作业保存的状态文件目录在aliyun oss上打不开
状态文件的数量与很多因素有关,比如作业的并行度,单条 state kv 的数据大小,状态的更新频率,key 的粒度等都有关系。 Best, Guojun On Wed, Mar 22, 2023 at 9:43 AM Shammon FY wrote: > 那可能需要确认一下这些状态 > 1. 是否确实属于这个作业的状态 > 2. 这些状态是成功的checkpoint还是失败的checkpoint > 3. 是否清理checkpoint出现了问题,排查下有没有相关错误日志 > > Best, > Shammon FY > > On Wed, Mar 22, 2023 at 8:51 AM casel.chen wrote: > > > 检查过了,当前`state.checkpoints.num-retained`参数值是3 > > > > > > 在 2023-03-21 20:05:35,"Shammon FY" 写道: > > >Hi > > > > > > >你可以检查一下checkpoint配置`state.checkpoints.num-retained`,是否保存的checkpoint数量太多了? > > > > > >Best, > > >Shammon FY > > > > > > > > >On Tue, Mar 21, 2023 at 11:55 AM casel.chen wrote: > > > > > >> 有一个flink cdc实现多表关联打宽的flink作业,作业状态达到20GB左右,远端状态存储用的是aliyun > > >> > > > oss。今天作业运行失败打算手动从checkpoint恢复时发现保存作业状态的checkpoint目录(share目录)无法通过浏览器打开,后来使用命令行list了一下该目录下的文件有多达上万个文件。该flink作业用的是rocksdb > > >> state > > >> > > > backend并开启了增量checkpoint。请问有什么办法可以解决这个问题吗?share目录下这么多文件是因为增量checkpoint遗留下来的吗? > > >
Re: Re: Re: Re: flink on yarn 异常停电问题咨询
Hi 确认一下这些 ha 文件的 last modification time 是一致的还是错开的? 另外,指定 chk- 恢复尝试了没有?可以恢复吗? Best, Guojun On Fri, Mar 10, 2023 at 11:56 AM guanyq wrote: > flink ha路径为 /tmp/flink/ha/ > flink chk路径为 /tmp/flink/checkpoint > > > 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。 > > > > > 会尝试从10个chk恢复,日志有打印 > 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Recovering checkpoints from ZooKeeper. > 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Found 10 checkpoints in ZooKeeper. > 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to fetch 10 checkpoints from storage. > 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7079. > 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7080. > 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7081. > 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7082. > 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7083. > 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7084. > 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7085. > 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7086. > > > > 详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxx启动 > 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl > - Starting the SlotManager. > 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster - > Successfully ran initialization on master in 0 ms. > 2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils - > Initialized ZooKeeperCompletedCheckpointStore in > '/checkpoints/3844b96b002601d932e66233dd46899c'. > 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster - > Using application-defined state backend: File State Backend (checkpoints: > 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, > fileStateThreshold: -1) > 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster - > Configuring application-defined state backend with job/cluster config > 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Recovering checkpoints from ZooKeeper. > 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Found 10 checkpoints in ZooKeeper. > 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to fetch 10 checkpoints from storage. > 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7079. > 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7080. > 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7081. > 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7082. > 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7083. > 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7084. > 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7085. > 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > - Trying to retrieve checkpoint 7086. > 2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O > error constructing remote block reader. > java.io.IOException: Got error, status message opReadBlock > BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278 > received exception > org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException: > The meta file length 0 is less than the expected length 7, for > OP_READ_BLOCK, self=/192.168.200.23:45534, remote=/192.168.200.21:9866, > for file > /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a, > for pool BP-1003103929-192.168.200.11-1668473836936 block > 1301252639_227512278 > at > org.apache.hadoop.hdfs.protocol.datatransfer.DataTran
Re: Re: Flink SQL 如何优化以及处理反压
可以看一下反压算子是否出现在同一台机器(排除单点故障)。比如使用了 rocksdb + hdd 盘;单机负载过高;磁盘打满等。 如果不是单点故障,可以打 jstack 查看对应的线程具体在执行什么样的操作,再进行相应的逻辑优化。 On Tue, Jan 31, 2023 at 6:01 PM lxk wrote: > 现在从web ui上看,瓶颈主要在于group by 聚合函数之后去重这个逻辑。 > 而且SQL这个并行度是全局设置的,没法针对某一个特定的算子设置并行度,并行度多了之后,资源又感觉有点吃紧。 > > > > > > > > > > > > > > > > > > 在 2023-01-31 17:45:15,"weijie guo" 写道: > >最好先找到导致下游处理过慢的瓶颈算子,适当扩大一下并发。如果还不行,看下jstack的情况,可能需要调整逻辑。 > > > >Best regards, > > > >Weijie > > > > > >ssmq <374060...@qq.com.invalid> 于2023年1月31日周二 17:22写道: > > > >> 你可以测试不写入clickhouse是否还存在反压,如果不是因为写入瓶颈的话就从你的处理逻辑优化了 > >> > >> > >> 发件人: lxk > >> 发送时间: 2023年1月31日 15:16 > >> 收件人: user-zh@flink.apache.org > >> 主题: Flink SQL 如何优化以及处理反压 > >> > >> Flink版本:1.16.0 > >> 目前在使用Flink SQL进行多流关联,并写入Clickhouse中 > >> 具体代码如下: > >> select \ > >> header.id as id, \ > >> LAST_VALUE(header.order_status), \ > >> LAST_VALUE(header.customer_id), \ > >> LAST_VALUE(header.shop_id), \ > >> LAST_VALUE(header.parent_order_id), \ > >> LAST_VALUE(header.order_at), \ > >> LAST_VALUE(header.pay_at), \ > >> LAST_VALUE(header.channel_id), \ > >> LAST_VALUE(header.root_order_id), \ > >> LAST_VALUE(header.last_updated_at), \ > >> item.id as item_id, \ > >> LAST_VALUE(item.order_id) as order_id, \ > >> LAST_VALUE(item.row_num), \ > >> LAST_VALUE(item.goods_id), \ > >> LAST_VALUE(item.s_sku_code), \ > >> LAST_VALUE(item.qty), \ > >> LAST_VALUE(item.p_paid_sub_amt), \ > >> LAST_VALUE(item.p_sp_sub_amt), \ > >> LAST_VALUE(item.bom_type), \ > >> LAST_VALUE(item.last_updated_at) as item_last_updated_at, \ > >> LAST_VALUE(item.display_qty), \ > >> LAST_VALUE(delivery.del_type), \ > >> LAST_VALUE(delivery.time_slot_type), \ > >> LAST_VALUE(delivery.time_slot_date), \ > >> LAST_VALUE(delivery.time_slot_time_from), \ > >> LAST_VALUE(delivery.time_slot_time_to), \ > >> LAST_VALUE(delivery.sku_delivery_type), \ > >> LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \ > >> LAST_VALUE(promotion.id) as promo_id, \ > >> LAST_VALUE(promotion.order_item_id), \ > >> LAST_VALUE(promotion.p_promo_amt), \ > >> LAST_VALUE(promotion.promotion_category), \ > >> LAST_VALUE(promotion.promo_type), \ > >> LAST_VALUE(promotion.promo_sub_type), \ > >> LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \ > >> LAST_VALUE(promotion.promotion_cost) \ > >> from \ > >> item \ > >> join \ > >> header \ > >> on item.order_id = header.id \ > >> left join \ > >> delivery \ > >> on item.order_id = delivery.order_id \ > >> left join \ > >> promotion \ > >> on item.id =promotion.order_item_id \ > >> group by header.id,item.id > >> 在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉: > >> https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg > >> > >> 参考了京东的一篇文章 > >> > https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc > >> ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。 > >> > >> conf.setString("table.exec.mini-batch.enabled", "true"); > >> conf.setString("table.exec.mini-batch.allow-latency", "15 s"); > >> conf.setString("table.exec.mini-batch.size", "5000"); > >> conf.setString("table.exec.state.ttl", "86400 s"); > >> conf.setString("table.exec.disabled-operators", "NestedLoopJoin"); > >> conf.setString("table.optimizer.join.broadcast-threshold", "-1"); > >> conf.setString("table.optimizer.multiple-input-enabled", "true"); > >> conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED"); > >> conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8"); > >> 想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段? > >> > >> > >> > >> >
Re:Re: Re: 关于如何得到管道中哪些源是有界和无界的问题
或许可以考虑在设计平台时将流任务和批任务做成不同的入口。 Best, Guojun 在 2022-10-28 18:14:33,"junjie.m...@goupwith.com" 写道: >这就是写代码和平台化的需求不同了,对于平台化需要能判断出写的sql代码块最终生成的管道是有界还是无界,对于有界需要对外提供restful >api由外部调度系统定时调起,而无界管道可以直接运行长期保持。 >当然还有很多场景下需要知道管道的有界和无界,这里我不一一例举了。 > > >发件人: weijie guo >发送时间: 2022-10-28 18:01 >收件人: user-zh >主题: Re: Re: 关于如何得到管道中哪些源是有界和无界的问题 >那应该没有办法在Table API中拿到了,我有一些不理解,为什么你需要自动判断执行模式,而不是根据你的任务的实际情况来设置。 >如果你期望以批的模式跑作业,然后有些Source是无界的,我理解这本身就是采用的source不合理,应该修改代码。 >另外流和批执行模式有很多不同之处,例如sum算子对于每个key是输出多条还是一条,这都是你选择执行模式的时候需要考量的。假设可以支持自动推断,让系统自动推断也可能出现很多预期之外的行为。 > >Best regards, > >Weijie > > >junjie.m...@goupwith.com 于2022年10月28日周五 17:51写道: > >> >> 我是flink1.14.5 >> >> EnvironmentSettings.fromConfiguration(ReadableConfig configuration) { >> final Builder builder = new Builder(); >> switch (configuration.get(RUNTIME_MODE)) { >> case STREAMING: >> builder.inStreamingMode(); >> break; >> case BATCH: >> builder.inBatchMode(); >> break; >> case AUTOMATIC: >> default: >> throw new TableException( >> String.format( >> "Unsupported mode '%s' for '%s'. " >> + "Only an explicit BATCH or STREAMING >> mode is supported in Table API.", >> configuration.get(RUNTIME_MODE), >> RUNTIME_MODE.key())); >> }限制了不支持AUTOMATIC >> >> >> 发件人: TonyChen >> 发送时间: 2022-10-28 17:13 >> 收件人: user-zh >> 主题: Re: 关于如何得到管道中哪些源是有界和无界的问题 >> 升个小版本,1.14.3就有AUTOMATIC >> >> >> Best, >> TonyChen >> >> > 2022年10月28日 17:09,junjie.m...@goupwith.com 写道: >> > >> > hi,weijie: >> > 我使用的是flink1.14里是不支持设置execution.runtime-mode=AUTOMATIC的,会报如下错误: >> > org.apache.flink.table.api.TableException: Unsupported mode 'AUTOMATIC' >> for 'execution.runtime-mode'. Only an explicit BATCH or STREAMING mode is >> supported in Table API. >> > >> > 是后续版本已经支持execution.runtime-mode=AUTOMATIC了吗? >> > >> > >> > 发件人: weijie guo >> > 发送时间: 2022-10-28 16:38 >> > 收件人: user-zh >> > 主题: Re: Re: 关于如何得到管道中哪些源是有界和无界的问题 >> > 这种需求可以吧execution.runtime-mode设置成AUTOMATIC, >> > 这种模式就是你的需求,如果所有的Source都是有界的,则使用Batch执行模式,否则使用Streaming >> > >> > Best regards, >> > >> > Weijie >> > >> > >> > junjie.m...@goupwith.com 于2022年10月28日周五 >> 15:56写道: >> > >> >> hi, Weijie: >> >> >> >> >> 由于我没有发现如何判断管道是有界还是无界的方法,特别是对于Table&SQL需要手动指定execution.runtime-mode=BATCH或STREAMING。 >> >> >> >> >> 我想通过得到所有的source的有界/无界来判断整个管道是有界/无界的,如果所有scnSource都是有界的则管道必定是有界管道,否则管道就是无界管道。 >> >> >> >> >> >> 发件人: weijie guo >> >> 发送时间: 2022-10-28 15:44 >> >> 收件人: user-zh >> >> 主题: Re: Re: 关于如何得到管道中哪些源是有界和无界的问题 >> >> Hi, junjie: >> >> >> >> 我想先了解一下你的目的是什么,为什么需要在Table API中判断Source的Boundness,这些信息对你的场景的帮助是什么? >> >> >> >> Best regards, >> >> >> >> Weijie >> >> >> >> >> >> junjie.m...@goupwith.com 于2022年10月28日周五 >> >> 15:36写道: >> >> >> >>> public static DynamicTableSource >> FactoryUtil.createTableSource(@Nullable >> >>> Catalog catalog,ObjectIdentifier objectIdentifier, ResolvedCatalogTable >> >>> catalogTable, ReadableConfig configuration, ClassLoader classLoader, >> >>> boolean isTemporary)可以得到但是需要传入的参数太多了,这些参数不知道如何获取 >> >>> >> >>> 发件人: junjie.m...@goupwith.com >> >>> 发送时间: 2022-10-28 15:33 >> >>> 收件人: user-zh >> >>> 主题: Re: Re: 关于如何得到管道中哪些源是有界和无界的问题 >> >>> 问题是如何拿到ScanTableSource呢?目前我没发现哪里可以得到 >> >>> >> >>> >> >>> 发件人: TonyChen >> >>> 发送时间: 2022-10-28 15:21 >> >>> 收件人: user-zh >> >>> 主题: Re: 关于如何得到管道中哪些源是有界和无界的问题 >> >>> 是不是可以看下这个 >> >>> >> >>> >> >> >> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/ScanTableSource.java#L94 >> >>> Best, >> >>> TonyChen >> 2022年10月28日 15:12,junjie.m...@goupwith.com 写道: >> >> 大家好: >> 有个问题判断Table是有界还是无界,或者通过TableEnv如何能得到source table信息和其有界还是无界。 >> >> >>> >> >> >> >>