Re: 关于flink @deprecated
Hi 你好,一般你看到 deprecated 的注释,同时会在 Java doc 中看到类似 "Use XX instead" 的语句,也就是建议使用后面的 XX 关于为什么会被 deprecated 可以查看下引入这个修改的 PR 或者相关的 JIRA Best, Congxian guanyq 于2020年4月4日周六 上午10:30写道: > > > > 辛苦了! > > > flink的一些方法或者类都被@deprecated修饰 > 1.如何找到相对应建议使用的方法或者类,来替换@deprecated修饰的类或方法? > 2.如何知道为什么这些@deprecated修饰的类或方法被弃用呢? > > > > > > > > > > >
关于flink @deprecated
辛苦了! flink的一些方法或者类都被@deprecated修饰 1.如何找到相对应建议使用的方法或者类,来替换@deprecated修饰的类或方法? 2.如何知道为什么这些@deprecated修饰的类或方法被弃用呢?
Re: 从savepoint不能恢复问题
Hi Savepoint 你可以理解为 OperatorId -> State 的一个映射[1],如果你不指定 OperatorId 则会随机生成一个,所以一般会需要用户指定一个 OperatorId。 另外你使用 SQL,可能 SQL 代码发生变更后,导致最终的作业 DAG 图已经变掉了,也就无法恢复了,首先,你可以看一下你两次执行的作业最终的 DAG 图是否完全一样,另外,可以看一下是否指定了 OperatorId [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/savepoints.html Best, Congxian LakeShen 于2020年4月3日周五 上午10:24写道: > Hi , > > 这种情况可能是你改变的 Flink SQL 的拓扑结构,导致部分算子的 uid 发生变化,然后在从状态恢复的时候,没有找到算子的状态。 > 所以在开发 SQL 任务的时候,一般更改 SQL 代码时,不要改变其拓扑结构,SQL 任务上线后,就不要在轻意改了。 > > Best, > LakeShen > > 酷酷的浑蛋 于2020年4月2日周四 下午6:31写道: > > > 关键我的程序是flink-sql,其它的算子基本都设置过uid了,flink-sql可以设置uid吗,或者说sql中的自动分配的uid怎么查找呢 > > > > > > | | > > apache22 > > | > > | > > apach...@163.com > > | > > 签名由网易邮箱大师定制 > > 在2020年4月2日 18:22,Yangze Guo 写道: > > 如果没有显示指定的话,operator id将是一个随机生成的值[1]. > > 当从savepoint恢复时,将依据这些id来匹配,如果发生了变化,可能是你修改了你的jobGraph。Flink推荐显示的指定operator > > id字面量。[2] > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#assigning-operator-ids > > [2] > > > https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#faq > > > > Best, > > Yangze Guo > > > > On Thu, Apr 2, 2020 at 6:01 PM 酷酷的浑蛋 wrote: > > > > > > > > Failed to rollback to checkpoint/savepoint > > hdfs://xxx/savepoint-9d5b7a-66c0340f6672. Cannot map checkpoint/savepoint > > state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, > > because the operator is not available in the new program. If you want to > > allow to skip this, you can set the --allowNonRestoredState option on the > > CLI. > > > > > > 像上面这个错误,我怎么根据 operator cbc357ccb763df2852fee8c4fc7d55f2 > 去找是我程序中的哪个算子不能恢复呢? > > | | > > apache22 > > | > > | > > apach...@163.com > > | > > 签名由网易邮箱大师定制 > > >
Re: 关于使用RocksDBStateBackend TTL 配置的问题
Hi 1 不参加 compaction 的不会被删除 2 TTL state 暂时无法做到一到时间就清除(我理解你这里担心不删除会有磁盘打满的风险),实际上现在会周期性的清理 state 的。另外如果你真的希望严格控制 state 的过期时间,或者你可以尝试下 ProcessFunction[1] 自己使用 Timer 相关的来控制 State 的生命周期 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html Best, Congxian yanggang_it_job 于2020年4月3日周五 上午11:14写道: > Hi: > > > 我们现在启用state.backend.rocksdb.ttl.compaction.filter.enabled进行rocksdb的有效期设置,但效果并不是那么理想。 >同时我也有以下问题想不明白: >1、如果rocksdb在compact的时候有些state并没有被compact到,是否就意味着就算这些state已经过期也不会被删除? > > 2、目前flink的ttl策略只有OnCreateAndWrite和OnReadAndWrite两种策略,是否有那种不需要刷新,到了TTL时间就自动清除。 > 否则就会出现state一直在刷新导致永远无法删除,最终导致磁盘打满 > > >目前我能想到的方案是,另外写一个定时任务根据配置去清除过期state。 >请问大家还有其他更好的方案吗?
Re: flink存在推的模式吗?
集群模式下server ip不能直接拿,比如在哪个taskmanager上执行。 如果实在想实现的话可以自己实现个SourceFunction. 在 2020/3/19 下午8:23,“512348363”<512348...@qq.com> 写入: 例如这个对应的 DataStream
Re: Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator
HDFS 上的路径和本地不一样,如果你要看 HDFS 路径的话,可能需要看 Checkjpoint Meta 的相关信息,这个比较麻烦,可以参考 CheckpointMetadataLoadingTest 的相关测试。 我再看了一下你给的 TM Log,看上去是 148 行的 outputStream.close() 出错了(有个比较奇怪的现象是,这里的 outputStream 是本地的文件,但是从错误栈看是 HadoopFileSystem)。你这个是稳定复现的问题吗?如果是的话,能否贴一下打开 debug log,贴一下 JM/TM log,另外能给一个可复现的 作业更好 Best, Congxian chenxyz 于2020年4月1日周三 下午5:18写道: > Hi, 从贤, > 我查看了下HDFS, > /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。 > > > > > > > > > 在 2020-04-01 16:50:13,"Congxian Qiu" 写道: > >Hi > >Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复 > >从 TM 日志看像下载出错了,你可以看下 > > >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst > >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因 > > > >Best, > >Congxian > > > > > >chenxyz 于2020年4月1日周三 下午3:02写道: > > > >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for > >> KeyedProcessOperator。这个问题怎么解决呢? > >> > >> 版本:1.10 standalone > >> > >> 配置信息: > >> > >> state.backend: rocksdb > >> > >> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint > >> > >> state.backend.incremental: true > >> > >> jobmanager.execution.failover-strategy: region > >> > >> io.tmp.dirs: /data/flink1_10/tmp > >> > >> > >> > >> > >> 任务的checkpoint配置: > >> > >> env.enableCheckpointing(2 * 60 * 1000); > >> > >> > >> > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > >> > >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000); > >> > >> env.getCheckpointConfig().setCheckpointTimeout(6); > >> > >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > >> > >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12); > >> > >> > >> > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > >> > >> > >> > >> > >> 日志信息: > >> > >> > >> > >> > >> 2020-04-01 11:13:03 > >> > >> java.lang.Exception: Exception while creating > StreamOperatorStateContext. > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) > >> > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > >> > >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > >> > >> at java.lang.Thread.run(Thread.java:748) > >> > >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > >> state backend for > >> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of > the > >> 1 provided restore options. > >> > >> at > >> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) > >> > >> ... 9 more > >> > >> Caused by: org.apache.flink.runtime.state.BackendBuildingException: > Caught > >> unexpected exception. > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) > >> > >> at > >> > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) > >> > >> at > >> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) > >> > >> at > >> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > >> > >> at > >> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > >> > >> ... 11 more > >> > >> Caused by: java.nio.file.NoSuchFileException: > >> >
Re: 【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner
你好,这个是预期中的。在新的类型系统下,我们将使用 LocalDateTime 作为 TIMESTAMP 类型的默认对象。 同时我们还禁用了 long 和 TIMESTAMP 对象的直接互转。 具体的原因和细节可以看: https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System Best, Kurt On Fri, Apr 3, 2020 at 4:58 PM 1193216154 <1193216...@qq.com> wrote: > > 你好,最近改成blinkplanner发现了两个问题。及时两者生成的proctime的时间类型不同,一个是TimeStamp,一个是LocalDateTime。 > > > org.apache.flink.table.dataformat.DataFormatConverters中TimestampConverter 的 > > toInternalImpl方法只支持TimeStamp的参数,而我遇见的情况是传进了long类型,导致类转换异常,如果能重载toInternalImpl方法加一个long,或许可以解决我的问题 > --原始邮件-- > 发件人:"Kurt Young" 发送时间:2020年4月1日(星期三) 上午9:22 > 收件人:"user-zh" > 主题:【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner > > > > 大家好, > > 正如大家所知,Blink planner 是 Flink 1.9 版本中引入的一个全新的 Table API 和 SQL 的翻译优化 > 器,并且我们已经在 1.10 版本中将其作为 SQL CLI 的默认 planner。由于社区很早就已经决定不再 > 针对老的优化器去增添任何新的功能,所以从功能和性能上来说,老的 flink planner 已经缺了很多 > 现在社区最新的 SQL 相关的功能,比如新的类型系统,更多的DDL的支持,以及即将在 1.11 发布 > 的新的 TableSource 和 TableSink 接口和随之而来的对 Binlog 类日志的解析。 > > 因此我们打算尝试在接下来这个 1.11 版本发布时将 blink planner 升级为默认的 planner。但在此之 > 前,我们希望听听您的反馈。尤其是一些使用过程的体验,和您观察到的或者经历过的 blink planner > 做不了的事情从而导致您退回了老的 flink planner 的原因,可能是两边功能不对等,或者 blink planner > 有什么新的 bug 而老的 planner 没有。我们距离 1.11 的需求冻结还有差不多一个月的时间,收到 > 您的反馈之后,我们有足够的时间进行修复和完善。 > > 希望听到您宝贵的声音和意见,谢谢。 > > Best, > Kurt
???????????????????? 1.11 ???????? blink planner ?????????? planner
??blinkplanner??proctime??TimeStampLocalDateTime?? org.apache.flink.table.dataformat.DataFormatConverters??TimestampConverter ?? toInternalImpl??TimeStamp??longtoInternalImpl??long?? ---- ??:"Kurt Young"
Re: [SURVEY] 您在使用什么数据变更同步工具(CDC)?
Hi 大家好, 非常感谢大家的反馈!我想与大家分享下调查问卷的结果。我在中英文邮件列表分别发了中英文的调查问卷,调查结果差异较大,很有意思。 *英文问卷* - Top3 CDC tools: Debezium (66.7%), Oracle GoldenGate (15.6%), Maxwell (13.3%) - Top3 databases: PostgresSQL(52.3%), MySQL (50%), MS SQL Server (34.1%) *中文问卷* - Top3 CDC tools: Canal (70.7%), Debezium (26.8%), Maxwell (23.2%) - Top3 databases: MySQL (85.4%), Oracle (13.4%) PostgresSQL (12.2%) 百分比 = 投票数/投票人数。 这份调查报告对于 Flink 社区支持 CDC 的规划非常有帮助,再次感谢大家的参与! Best, Jark On Thu, 12 Mar 2020 at 10:22, Jark Wu wrote: > Hi Benchao, > > Great feedbacks! > > 1) 全量初始化能力: > 第一版中社区有计划引入 flink sql 直连 mysql 获取 binlog 的方案,该方案可以获取全量+增量 binlog,且有一致性保证。 > 但是对接db 全量+ mq binlog,将会是未来的一个工作,主要的难点在于全量如何平滑切换到 增量的 mq offset 上。 > > 2) 自动生成watermark: > 这也是 roadmap 上的一个工作。 > > 3) binlog以state的形式存储,只需全量加载,后续只接受增量: > 我理解这个和 (1) 是类似的需求。Flink SQL 对接之后 binlog,也即 binlog 数据流转成了一个动态表,也即 Flink > 在维护这个动态表的数据(state)。 > > 4) Schema 变更: > 这会是生产中非常有用的一个特性。但是目前还没有很清楚的方案。 > > 5) flink作为一个数据同步工具: > 这是非常有可能的。基于 Flink 直连 db binlog 的能力,我们可以非常方便地基于该能力搭建出一个同步工具(类似 canal),基于 > flink sql 丰富的生态, > 可以快速对接各种 connector。这也许将来会成为一个三方库。 > > Best, > Jark > > > > > On Wed, 11 Mar 2020 at 23:52, Benchao Li wrote: > >> Hi, >> >> 感谢Jark发起这个话题的讨论,这个功能对于Flink SQL来讲是一个非常重要的扩展。 >> >> 问卷已填,再此再提几个小想法: >> 1. 希望对接binlog时可以有全量初始化的能力,这样在Flink中我们就有了一个全表的实时状态,方便其他表与之进行join。 >> 2. 希望能够自动生成watermark,这样子可以尽可能的减少接入成本。因为有些场景是其他的append >> log数据可以跟实时维护的表进行join;也有些场景是两个binlog形成的动态表互相join。 >> 3. 希望可以把binlog以state的形式存储在flink里,除了第一次启动需要全量加载,后续的运维都可以再此基础上只接收增量即可。 >> 4. 如此之外,如果能有schema变更感知能力是最好的。(当然这个可能很难体现在SQL里面,毕竟SQL作业在启动时就已经确定了table >> 的schema) >> 5. >> >> 最后一点,感觉不太符合flink现在的定位,但是可能会有用户会这样来使用。就是直接把flink作为一个数据同步工具,消费binlog,直接同步到其他存储里面。(可能基本不需要做任何加工的那种,而且最好是能够有自动感知schema变更,同时可以变更下游的存储系统的schema) >> >> Jark Wu 于2020年3月11日周三 下午3:00写道: >> >> > Hi, 大家好, >> > >> > Flink 社区目前正在对接一些 CDC (Change Data Capture) 工具,以期在下个版本能支持读取和处理常见的 binlog >> > 数据,所以需要调研下大家目前主要使用的 CDC 工具是什么。 >> > >> > 欢迎大家填下问卷调查,您的反馈对我们非常重要,谢谢! >> > >> > http://apacheflink.mikecrm.com/wDivVQ1 >> > >> > 也欢迎大家在这个邮件下讨论关于 Flink 对接 CDC 的一些想法、需求、期望。 >> > >> > Best, >> > Jark >> > >> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: libenc...@gmail.com; libenc...@pku.edu.cn >> >
flink sql ????time window join ????
hi all??flink sqlwindow join
flink sql ????time window join ????
hi all??flink sqlwindow join
回复:keyby的乱序处理
1.未keyby的话,user1 user2 user3的顺序取决于分区策略,比如forward他们还是会在一个subtask上,顺序还是有序的,如果被打散的话就不确定了 2.keyby的话,可以保证同一个key的后续数据保持有序,不同的key不能保证一定有序 | | Sun.Zhu | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master 在2020年03月31日 15:39,tingli ke 写道: HI,再次补充一下我的场景,如下图所示: 1、kafka TopicA的Partiton1的数据包含3个user的数据 2、flink在对该分区生成了w1、w2、w3...的watermark 问题来了: 1、w1、w2、w3...的watermark只能保证user1、user2、user3的整体数据的有序处理对吗? 2、在对user1、user2、user3进行keyby后,w1、w2、w3...的watermark能保证user1或者user2或者user3的有序处理吗? 期待大神的回复! jun su 于2020年3月31日周二 下午1:10写道: hi, keyby后的watermark应该是上游多个线程中最小的watermark , 所以数据虽然可能乱序, 但是watermark并不会乱, 不会影响后续的窗口触发 tingli ke 于2020年3月31日周二 上午9:54写道: > 您好, > 针对您的回复,现在的场景是这样子的 > 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton > 发射 watermark; > 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗? > 3、是否存在可以不用经过第一个前提的方案,直接在keyby之后设置watermark? > > Jimmy Wong 于2020年3月30日周一 下午9:13写道: > > > Hi, > > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy > > 或其他分配策略,可能导致数据更大的延迟(EventTime)。 > > > > > > “想做key化的乱序处理” 这句没太理解,麻烦解释下。 > > > > > > | | > > Jimmy Wong > > | > > | > > wangzmk...@163.com > > | > > 签名由网易邮箱大师定制 > > > > > > 在2020年03月30日 20:58,tingli ke 写道: > > 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗 > > > -- Best, Jun Su
Re: 关于使用RocksDBStateBackend TTL 配置的问题
Hi 只是配置state.backend.rocksdb.ttl.compaction.filter.enabled 还需要相关的state descriptor也配置上state ttl config,不确定这里所谓的“不理想”的效果是没有及时删除,还是彻底没有删除? 目前RocksDB的后台清理确实需要依赖于compaction的执行,换言之,如果有部分数据一直没有进入compaction,确实存在理论上的可能性一直不会因为过期而删除,但是这个可能性很低不应该对你的使用体验带来很大的影响。 现在的这两种策略是更新时间戳的策略,只要不再访问,到时间都会因为TTL而自动后台清除的。 定时任务去清除过期state这种思路相比目前的实现会影响性能,而且还需要谨慎的一点是目前Flink的访问模式都是单线程的,要避免多线程同时访问。 祝好 唐云 From: yanggang_it_job Sent: Friday, April 3, 2020 11:14 To: user-zh@flink.apache.org Subject: 关于使用RocksDBStateBackend TTL 配置的问题 Hi: 我们现在启用state.backend.rocksdb.ttl.compaction.filter.enabled进行rocksdb的有效期设置,但效果并不是那么理想。 同时我也有以下问题想不明白: 1、如果rocksdb在compact的时候有些state并没有被compact到,是否就意味着就算这些state已经过期也不会被删除? 2、目前flink的ttl策略只有OnCreateAndWrite和OnReadAndWrite两种策略,是否有那种不需要刷新,到了TTL时间就自动清除。 否则就会出现state一直在刷新导致永远无法删除,最终导致磁盘打满 目前我能想到的方案是,另外写一个定时任务根据配置去清除过期state。 请问大家还有其他更好的方案吗?