Re: Re: flink作业保存的状态文件目录在aliyun oss上打不开

2023-03-21 文章 Guojun Li
状态文件的数量与很多因素有关,比如作业的并行度,单条 state kv 的数据大小,状态的更新频率,key 的粒度等都有关系。

Best,
Guojun

On Wed, Mar 22, 2023 at 9:43 AM Shammon FY  wrote:

> 那可能需要确认一下这些状态
> 1. 是否确实属于这个作业的状态
> 2. 这些状态是成功的checkpoint还是失败的checkpoint
> 3. 是否清理checkpoint出现了问题,排查下有没有相关错误日志
>
> Best,
> Shammon FY
>
> On Wed, Mar 22, 2023 at 8:51 AM casel.chen  wrote:
>
> > 检查过了,当前`state.checkpoints.num-retained`参数值是3
> >
> >
> > 在 2023-03-21 20:05:35,"Shammon FY"  写道:
> > >Hi
> > >
> >
> >你可以检查一下checkpoint配置`state.checkpoints.num-retained`,是否保存的checkpoint数量太多了?
> > >
> > >Best,
> > >Shammon FY
> > >
> > >
> > >On Tue, Mar 21, 2023 at 11:55 AM casel.chen  wrote:
> > >
> > >> 有一个flink cdc实现多表关联打宽的flink作业,作业状态达到20GB左右,远端状态存储用的是aliyun
> > >>
> >
> oss。今天作业运行失败打算手动从checkpoint恢复时发现保存作业状态的checkpoint目录(share目录)无法通过浏览器打开,后来使用命令行list了一下该目录下的文件有多达上万个文件。该flink作业用的是rocksdb
> > >> state
> > >>
> >
> backend并开启了增量checkpoint。请问有什么办法可以解决这个问题吗?share目录下这么多文件是因为增量checkpoint遗留下来的吗?
> >
>


Re: Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-13 文章 Guojun Li
Hi

确认一下这些 ha 文件的 last modification time 是一致的还是错开的?

另外,指定 chk- 恢复尝试了没有?可以恢复吗?

Best,
Guojun

On Fri, Mar 10, 2023 at 11:56 AM guanyq  wrote:

> flink ha路径为 /tmp/flink/ha/
> flink chk路径为 /tmp/flink/checkpoint
>
>
> 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。
>
>
>
>
> 会尝试从10个chk恢复,日志有打印
> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 10 checkpoints in ZooKeeper.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to fetch 10 checkpoints from storage.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7079.
> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7080.
> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7081.
> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7082.
> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7083.
> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7084.
> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7085.
> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7086.
>
>
>
> 详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxx启动
> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
> - Starting the SlotManager.
> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Successfully ran initialization on master in 0 ms.
> 2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils -
> Initialized ZooKeeperCompletedCheckpointStore in
> '/checkpoints/3844b96b002601d932e66233dd46899c'.
> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Using application-defined state backend: File State Backend (checkpoints:
> 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED,
> fileStateThreshold: -1)
> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Configuring application-defined state backend with job/cluster config
> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 10 checkpoints in ZooKeeper.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to fetch 10 checkpoints from storage.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7079.
> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7080.
> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7081.
> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7082.
> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7083.
> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7084.
> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7085.
> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7086.
> 2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O
> error constructing remote block reader.
> java.io.IOException: Got error, status message opReadBlock
> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> received exception
> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> The meta file length 0 is less than the expected length 7, for
> OP_READ_BLOCK, self=/192.168.200.23:45534, remote=/192.168.200.21:9866,
> for file
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> for pool BP-1003103929-192.168.200.11-1668473836936 block
> 1301252639_227512278
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.DataTran

Re: Re: Flink SQL 如何优化以及处理反压

2023-03-01 文章 Guojun Li
可以看一下反压算子是否出现在同一台机器(排除单点故障)。比如使用了 rocksdb + hdd 盘;单机负载过高;磁盘打满等。
如果不是单点故障,可以打 jstack 查看对应的线程具体在执行什么样的操作,再进行相应的逻辑优化。

On Tue, Jan 31, 2023 at 6:01 PM lxk  wrote:

> 现在从web ui上看,瓶颈主要在于group by 聚合函数之后去重这个逻辑。
> 而且SQL这个并行度是全局设置的,没法针对某一个特定的算子设置并行度,并行度多了之后,资源又感觉有点吃紧。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-01-31 17:45:15,"weijie guo"  写道:
> >最好先找到导致下游处理过慢的瓶颈算子,适当扩大一下并发。如果还不行,看下jstack的情况,可能需要调整逻辑。
> >
> >Best regards,
> >
> >Weijie
> >
> >
> >ssmq <374060...@qq.com.invalid> 于2023年1月31日周二 17:22写道:
> >
> >> 你可以测试不写入clickhouse是否还存在反压,如果不是因为写入瓶颈的话就从你的处理逻辑优化了
> >>
> >>
> >> 发件人: lxk
> >> 发送时间: 2023年1月31日 15:16
> >> 收件人: user-zh@flink.apache.org
> >> 主题: Flink SQL 如何优化以及处理反压
> >>
> >> Flink版本:1.16.0
> >> 目前在使用Flink SQL进行多流关联,并写入Clickhouse中
> >> 具体代码如下:
> >> select \
> >> header.id as id, \
> >> LAST_VALUE(header.order_status), \
> >> LAST_VALUE(header.customer_id), \
> >> LAST_VALUE(header.shop_id), \
> >> LAST_VALUE(header.parent_order_id), \
> >> LAST_VALUE(header.order_at), \
> >> LAST_VALUE(header.pay_at), \
> >> LAST_VALUE(header.channel_id), \
> >> LAST_VALUE(header.root_order_id), \
> >> LAST_VALUE(header.last_updated_at), \
> >> item.id as item_id, \
> >> LAST_VALUE(item.order_id) as order_id, \
> >> LAST_VALUE(item.row_num), \
> >> LAST_VALUE(item.goods_id), \
> >> LAST_VALUE(item.s_sku_code), \
> >> LAST_VALUE(item.qty), \
> >> LAST_VALUE(item.p_paid_sub_amt), \
> >> LAST_VALUE(item.p_sp_sub_amt), \
> >> LAST_VALUE(item.bom_type), \
> >> LAST_VALUE(item.last_updated_at) as item_last_updated_at, \
> >> LAST_VALUE(item.display_qty), \
> >> LAST_VALUE(delivery.del_type), \
> >> LAST_VALUE(delivery.time_slot_type), \
> >> LAST_VALUE(delivery.time_slot_date), \
> >> LAST_VALUE(delivery.time_slot_time_from), \
> >> LAST_VALUE(delivery.time_slot_time_to), \
> >> LAST_VALUE(delivery.sku_delivery_type), \
> >> LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \
> >> LAST_VALUE(promotion.id) as promo_id, \
> >> LAST_VALUE(promotion.order_item_id), \
> >> LAST_VALUE(promotion.p_promo_amt), \
> >> LAST_VALUE(promotion.promotion_category), \
> >> LAST_VALUE(promotion.promo_type), \
> >> LAST_VALUE(promotion.promo_sub_type), \
> >> LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \
> >> LAST_VALUE(promotion.promotion_cost) \
> >> from \
> >>   item \
> >>   join \
> >>   header  \
> >>   on item.order_id = header.id \
> >>   left join \
> >>   delivery \
> >>   on item.order_id = delivery.order_id \
> >>   left join \
> >>   promotion \
> >>   on item.id =promotion.order_item_id \
> >>   group by header.id,item.id
> >> 在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉:
> >> https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg
> >>
> >> 参考了京东的一篇文章
> >>
> https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc
> >> ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。
> >>
> >> conf.setString("table.exec.mini-batch.enabled", "true");
> >> conf.setString("table.exec.mini-batch.allow-latency", "15 s");
> >> conf.setString("table.exec.mini-batch.size", "5000");
> >> conf.setString("table.exec.state.ttl", "86400 s");
> >> conf.setString("table.exec.disabled-operators", "NestedLoopJoin");
> >> conf.setString("table.optimizer.join.broadcast-threshold", "-1");
> >> conf.setString("table.optimizer.multiple-input-enabled", "true");
> >> conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED");
> >> conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8");
> >> 想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段?
> >>
> >>
> >>
> >>
>


Re:Re: Re: 关于如何得到管道中哪些源是有界和无界的问题

2022-10-31 文章 Guojun Li



或许可以考虑在设计平台时将流任务和批任务做成不同的入口。


Best,
Guojun





在 2022-10-28 18:14:33,"junjie.m...@goupwith.com"  写道:
>这就是写代码和平台化的需求不同了,对于平台化需要能判断出写的sql代码块最终生成的管道是有界还是无界,对于有界需要对外提供restful 
>api由外部调度系统定时调起,而无界管道可以直接运行长期保持。
>当然还有很多场景下需要知道管道的有界和无界,这里我不一一例举了。
>
> 
>发件人: weijie guo
>发送时间: 2022-10-28 18:01
>收件人: user-zh
>主题: Re: Re: 关于如何得到管道中哪些源是有界和无界的问题
>那应该没有办法在Table API中拿到了,我有一些不理解,为什么你需要自动判断执行模式,而不是根据你的任务的实际情况来设置。
>如果你期望以批的模式跑作业,然后有些Source是无界的,我理解这本身就是采用的source不合理,应该修改代码。
>另外流和批执行模式有很多不同之处,例如sum算子对于每个key是输出多条还是一条,这都是你选择执行模式的时候需要考量的。假设可以支持自动推断,让系统自动推断也可能出现很多预期之外的行为。
> 
>Best regards,
> 
>Weijie
> 
> 
>junjie.m...@goupwith.com  于2022年10月28日周五 17:51写道:
> 
>>
>> 我是flink1.14.5
>>
>> EnvironmentSettings.fromConfiguration(ReadableConfig configuration) {
>> final Builder builder = new Builder();
>> switch (configuration.get(RUNTIME_MODE)) {
>> case STREAMING:
>> builder.inStreamingMode();
>> break;
>> case BATCH:
>> builder.inBatchMode();
>> break;
>> case AUTOMATIC:
>> default:
>> throw new TableException(
>> String.format(
>> "Unsupported mode '%s' for '%s'. "
>> + "Only an explicit BATCH or STREAMING
>> mode is supported in Table API.",
>> configuration.get(RUNTIME_MODE),
>> RUNTIME_MODE.key()));
>> }限制了不支持AUTOMATIC
>>
>>
>> 发件人: TonyChen
>> 发送时间: 2022-10-28 17:13
>> 收件人: user-zh
>> 主题: Re: 关于如何得到管道中哪些源是有界和无界的问题
>> 升个小版本,1.14.3就有AUTOMATIC
>>
>>
>> Best,
>> TonyChen
>>
>> > 2022年10月28日 17:09,junjie.m...@goupwith.com 写道:
>> >
>> > hi,weijie:
>> > 我使用的是flink1.14里是不支持设置execution.runtime-mode=AUTOMATIC的,会报如下错误:
>> > org.apache.flink.table.api.TableException: Unsupported mode 'AUTOMATIC'
>> for 'execution.runtime-mode'. Only an explicit BATCH or STREAMING mode is
>> supported in Table API.
>> >
>> > 是后续版本已经支持execution.runtime-mode=AUTOMATIC了吗?
>> >
>> >
>> > 发件人: weijie guo
>> > 发送时间: 2022-10-28 16:38
>> > 收件人: user-zh
>> > 主题: Re: Re: 关于如何得到管道中哪些源是有界和无界的问题
>> > 这种需求可以吧execution.runtime-mode设置成AUTOMATIC,
>> > 这种模式就是你的需求,如果所有的Source都是有界的,则使用Batch执行模式,否则使用Streaming
>> >
>> > Best regards,
>> >
>> > Weijie
>> >
>> >
>> > junjie.m...@goupwith.com  于2022年10月28日周五
>> 15:56写道:
>> >
>> >> hi, Weijie:
>> >>
>> >>
>> 由于我没有发现如何判断管道是有界还是无界的方法,特别是对于Table&SQL需要手动指定execution.runtime-mode=BATCH或STREAMING。
>> >>
>> >>
>> 我想通过得到所有的source的有界/无界来判断整个管道是有界/无界的,如果所有scnSource都是有界的则管道必定是有界管道,否则管道就是无界管道。
>> >>
>> >>
>> >> 发件人: weijie guo
>> >> 发送时间: 2022-10-28 15:44
>> >> 收件人: user-zh
>> >> 主题: Re: Re: 关于如何得到管道中哪些源是有界和无界的问题
>> >> Hi, junjie:
>> >>
>> >> 我想先了解一下你的目的是什么,为什么需要在Table API中判断Source的Boundness,这些信息对你的场景的帮助是什么?
>> >>
>> >> Best regards,
>> >>
>> >> Weijie
>> >>
>> >>
>> >> junjie.m...@goupwith.com  于2022年10月28日周五
>> >> 15:36写道:
>> >>
>> >>> public static DynamicTableSource
>> FactoryUtil.createTableSource(@Nullable
>> >>> Catalog catalog,ObjectIdentifier objectIdentifier, ResolvedCatalogTable
>> >>> catalogTable, ReadableConfig configuration, ClassLoader classLoader,
>> >>> boolean isTemporary)可以得到但是需要传入的参数太多了,这些参数不知道如何获取
>> >>>
>> >>> 发件人: junjie.m...@goupwith.com
>> >>> 发送时间: 2022-10-28 15:33
>> >>> 收件人: user-zh
>> >>> 主题: Re: Re: 关于如何得到管道中哪些源是有界和无界的问题
>> >>> 问题是如何拿到ScanTableSource呢?目前我没发现哪里可以得到
>> >>>
>> >>>
>> >>> 发件人: TonyChen
>> >>> 发送时间: 2022-10-28 15:21
>> >>> 收件人: user-zh
>> >>> 主题: Re: 关于如何得到管道中哪些源是有界和无界的问题
>> >>> 是不是可以看下这个
>> >>>
>> >>>
>> >>
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/ScanTableSource.java#L94
>> >>> Best,
>> >>> TonyChen
>>  2022年10月28日 15:12,junjie.m...@goupwith.com 写道:
>> 
>>  大家好:
>>    有个问题判断Table是有界还是无界,或者通过TableEnv如何能得到source table信息和其有界还是无界。
>> 
>> >>>
>> >>
>>
>>