Re: 关于flink @deprecated

2020-04-03 文章 Congxian Qiu
Hi
你好,一般你看到 deprecated 的注释,同时会在 Java doc 中看到类似 "Use XX instead"
的语句,也就是建议使用后面的 XX
关于为什么会被 deprecated 可以查看下引入这个修改的 PR 或者相关的 JIRA

Best,
Congxian


guanyq  于2020年4月4日周六 上午10:30写道:

>
>
>
> 辛苦了!
>
>
> flink的一些方法或者类都被@deprecated修饰
> 1.如何找到相对应建议使用的方法或者类,来替换@deprecated修饰的类或方法?
> 2.如何知道为什么这些@deprecated修饰的类或方法被弃用呢?
>
>
>
>
>
>
>
>
>
>
>


关于flink @deprecated

2020-04-03 文章 guanyq



辛苦了!


flink的一些方法或者类都被@deprecated修饰
1.如何找到相对应建议使用的方法或者类,来替换@deprecated修饰的类或方法?
2.如何知道为什么这些@deprecated修饰的类或方法被弃用呢?




 





 

Re: 从savepoint不能恢复问题

2020-04-03 文章 Congxian Qiu
Hi
Savepoint 你可以理解为 OperatorId -> State 的一个映射[1],如果你不指定 OperatorId
则会随机生成一个,所以一般会需要用户指定一个 OperatorId。
另外你使用 SQL,可能 SQL 代码发生变更后,导致最终的作业 DAG 图已经变掉了,也就无法恢复了,首先,你可以看一下你两次执行的作业最终的
DAG 图是否完全一样,另外,可以看一下是否指定了 OperatorId

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/savepoints.html
Best,
Congxian


LakeShen  于2020年4月3日周五 上午10:24写道:

> Hi ,
>
> 这种情况可能是你改变的 Flink SQL 的拓扑结构,导致部分算子的 uid 发生变化,然后在从状态恢复的时候,没有找到算子的状态。
> 所以在开发 SQL 任务的时候,一般更改 SQL 代码时,不要改变其拓扑结构,SQL 任务上线后,就不要在轻意改了。
>
> Best,
> LakeShen
>
> 酷酷的浑蛋  于2020年4月2日周四 下午6:31写道:
>
> > 关键我的程序是flink-sql,其它的算子基本都设置过uid了,flink-sql可以设置uid吗,或者说sql中的自动分配的uid怎么查找呢
> >
> >
> > | |
> > apache22
> > |
> > |
> > apach...@163.com
> > |
> > 签名由网易邮箱大师定制
> > 在2020年4月2日 18:22,Yangze Guo 写道:
> > 如果没有显示指定的话,operator id将是一个随机生成的值[1].
> > 当从savepoint恢复时,将依据这些id来匹配,如果发生了变化,可能是你修改了你的jobGraph。Flink推荐显示的指定operator
> > id字面量。[2]
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#assigning-operator-ids
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#faq
> >
> > Best,
> > Yangze Guo
> >
> > On Thu, Apr 2, 2020 at 6:01 PM 酷酷的浑蛋  wrote:
> >
> >
> >
> > Failed to rollback to checkpoint/savepoint
> > hdfs://xxx/savepoint-9d5b7a-66c0340f6672. Cannot map checkpoint/savepoint
> > state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program,
> > because the operator is not available in the new program. If you want to
> > allow to skip this, you can set the --allowNonRestoredState option on the
> > CLI.
> >
> >
> > 像上面这个错误,我怎么根据 operator cbc357ccb763df2852fee8c4fc7d55f2
> 去找是我程序中的哪个算子不能恢复呢?
> > | |
> > apache22
> > |
> > |
> > apach...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
>


Re: 关于使用RocksDBStateBackend TTL 配置的问题

2020-04-03 文章 Congxian Qiu
Hi
1 不参加 compaction 的不会被删除
2 TTL state 暂时无法做到一到时间就清除(我理解你这里担心不删除会有磁盘打满的风险),实际上现在会周期性的清理 state
的。另外如果你真的希望严格控制 state 的过期时间,或者你可以尝试下 ProcessFunction[1] 自己使用 Timer 相关的来控制
State 的生命周期

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html
Best,
Congxian


yanggang_it_job  于2020年4月3日周五 上午11:14写道:

> Hi:
>
>  
> 我们现在启用state.backend.rocksdb.ttl.compaction.filter.enabled进行rocksdb的有效期设置,但效果并不是那么理想。
>同时我也有以下问题想不明白:
>1、如果rocksdb在compact的时候有些state并没有被compact到,是否就意味着就算这些state已经过期也不会被删除?
>
>  2、目前flink的ttl策略只有OnCreateAndWrite和OnReadAndWrite两种策略,是否有那种不需要刷新,到了TTL时间就自动清除。
>  否则就会出现state一直在刷新导致永远无法删除,最终导致磁盘打满
>
>
>目前我能想到的方案是,另外写一个定时任务根据配置去清除过期state。
>请问大家还有其他更好的方案吗?


Re: flink存在推的模式吗?

2020-04-03 文章 Djeng Lee
集群模式下server ip不能直接拿,比如在哪个taskmanager上执行。 如果实在想实现的话可以自己实现个SourceFunction. 

在 2020/3/19 下午8:23,“512348363”<512348...@qq.com> 写入:

例如这个对应的
DataStream

Re: Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-03 文章 Congxian Qiu
HDFS 上的路径和本地不一样,如果你要看 HDFS 路径的话,可能需要看 Checkjpoint Meta 的相关信息,这个比较麻烦,可以参考
CheckpointMetadataLoadingTest 的相关测试。
我再看了一下你给的 TM Log,看上去是 148 行的 outputStream.close() 出错了(有个比较奇怪的现象是,这里的
outputStream 是本地的文件,但是从错误栈看是 HadoopFileSystem)。你这个是稳定复现的问题吗?如果是的话,能否贴一下打开
debug log,贴一下 JM/TM log,另外能给一个可复现的 作业更好

Best,
Congxian


chenxyz  于2020年4月1日周三 下午5:18写道:

> Hi, 从贤,
> 我查看了下HDFS,
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。
>
>
>
>
>
>
>
>
> 在 2020-04-01 16:50:13,"Congxian Qiu"  写道:
> >Hi
> >Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
> >从 TM 日志看像下载出错了,你可以看下
>
> >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
> >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因
> >
> >Best,
> >Congxian
> >
> >
> >chenxyz  于2020年4月1日周三 下午3:02写道:
> >
> >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for
> >> KeyedProcessOperator。这个问题怎么解决呢?
> >>
> >> 版本:1.10 standalone
> >>
> >> 配置信息:
> >>
> >> state.backend: rocksdb
> >>
> >> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
> >>
> >> state.backend.incremental: true
> >>
> >> jobmanager.execution.failover-strategy: region
> >>
> >> io.tmp.dirs: /data/flink1_10/tmp
> >>
> >>
> >>
> >>
> >> 任务的checkpoint配置:
> >>
> >> env.enableCheckpointing(2 * 60 * 1000);
> >>
> >>
> >>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >>
> >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
> >>
> >> env.getCheckpointConfig().setCheckpointTimeout(6);
> >>
> >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> >>
> >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
> >>
> >>
> >>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >>
> >>
> >>
> >>
> >> 日志信息:
> >>
> >>
> >>
> >>
> >> 2020-04-01 11:13:03
> >>
> >> java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >>
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> >> state backend for
> >> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of
> the
> >> 1 provided restore options.
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> >>
> >> ... 9 more
> >>
> >> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Caught
> >> unexpected exception.
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> >>
> >> ... 11 more
> >>
> >> Caused by: java.nio.file.NoSuchFileException:
> >>
> 

Re: 【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner

2020-04-03 文章 Kurt Young
你好,这个是预期中的。在新的类型系统下,我们将使用 LocalDateTime 作为 TIMESTAMP 类型的默认对象。
同时我们还禁用了 long 和 TIMESTAMP 对象的直接互转。
具体的原因和细节可以看:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System

Best,
Kurt


On Fri, Apr 3, 2020 at 4:58 PM 1193216154 <1193216...@qq.com> wrote:

>
> 你好,最近改成blinkplanner发现了两个问题。及时两者生成的proctime的时间类型不同,一个是TimeStamp,一个是LocalDateTime。
>
>
> org.apache.flink.table.dataformat.DataFormatConverters中TimestampConverter 的
>
> toInternalImpl方法只支持TimeStamp的参数,而我遇见的情况是传进了long类型,导致类转换异常,如果能重载toInternalImpl方法加一个long,或许可以解决我的问题
> --原始邮件--
> 发件人:"Kurt Young" 发送时间:2020年4月1日(星期三) 上午9:22
> 收件人:"user-zh"
> 主题:【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner
>
>
>
> 大家好,
>
> 正如大家所知,Blink planner 是 Flink 1.9 版本中引入的一个全新的 Table API 和 SQL 的翻译优化
> 器,并且我们已经在 1.10 版本中将其作为 SQL CLI 的默认 planner。由于社区很早就已经决定不再
> 针对老的优化器去增添任何新的功能,所以从功能和性能上来说,老的 flink planner 已经缺了很多
> 现在社区最新的 SQL 相关的功能,比如新的类型系统,更多的DDL的支持,以及即将在 1.11 发布
> 的新的 TableSource 和 TableSink 接口和随之而来的对 Binlog 类日志的解析。
>
> 因此我们打算尝试在接下来这个 1.11 版本发布时将 blink planner 升级为默认的 planner。但在此之
> 前,我们希望听听您的反馈。尤其是一些使用过程的体验,和您观察到的或者经历过的 blink planner
> 做不了的事情从而导致您退回了老的 flink planner 的原因,可能是两边功能不对等,或者 blink planner
> 有什么新的 bug 而老的 planner 没有。我们距离 1.11 的需求冻结还有差不多一个月的时间,收到
> 您的反馈之后,我们有足够的时间进行修复和完善。
>
> 希望听到您宝贵的声音和意见,谢谢。
>
> Best,
> Kurt


???????????????????? 1.11 ???????? blink planner ?????????? planner

2020-04-03 文章 1193216154
??blinkplanner??proctime??TimeStampLocalDateTime??


org.apache.flink.table.dataformat.DataFormatConverters??TimestampConverter ??
toInternalImpl??TimeStamp??longtoInternalImpl??long??
----
??:"Kurt Young"

Re: [SURVEY] 您在使用什么数据变更同步工具(CDC)?

2020-04-03 文章 Jark Wu
Hi 大家好,

非常感谢大家的反馈!我想与大家分享下调查问卷的结果。我在中英文邮件列表分别发了中英文的调查问卷,调查结果差异较大,很有意思。

*英文问卷*
- Top3 CDC tools: Debezium (66.7%), Oracle GoldenGate (15.6%), Maxwell
(13.3%)
- Top3 databases: PostgresSQL(52.3%), MySQL (50%), MS SQL Server (34.1%)

*中文问卷*
- Top3 CDC tools: Canal (70.7%), Debezium (26.8%), Maxwell (23.2%)
- Top3 databases: MySQL (85.4%), Oracle (13.4%) PostgresSQL (12.2%)

百分比 = 投票数/投票人数。

这份调查报告对于 Flink 社区支持 CDC 的规划非常有帮助,再次感谢大家的参与!

Best,
Jark

On Thu, 12 Mar 2020 at 10:22, Jark Wu  wrote:

> Hi Benchao,
>
> Great feedbacks!
>
> 1) 全量初始化能力:
> 第一版中社区有计划引入 flink sql 直连 mysql 获取 binlog 的方案,该方案可以获取全量+增量 binlog,且有一致性保证。
> 但是对接db 全量+ mq binlog,将会是未来的一个工作,主要的难点在于全量如何平滑切换到 增量的 mq offset 上。
>
> 2) 自动生成watermark:
> 这也是 roadmap 上的一个工作。
>
> 3) binlog以state的形式存储,只需全量加载,后续只接受增量:
> 我理解这个和 (1) 是类似的需求。Flink SQL 对接之后 binlog,也即 binlog 数据流转成了一个动态表,也即 Flink
> 在维护这个动态表的数据(state)。
>
> 4) Schema 变更:
> 这会是生产中非常有用的一个特性。但是目前还没有很清楚的方案。
>
> 5) flink作为一个数据同步工具:
> 这是非常有可能的。基于 Flink 直连 db binlog 的能力,我们可以非常方便地基于该能力搭建出一个同步工具(类似 canal),基于
> flink sql 丰富的生态,
> 可以快速对接各种 connector。这也许将来会成为一个三方库。
>
> Best,
> Jark
>
>
>
>
> On Wed, 11 Mar 2020 at 23:52, Benchao Li  wrote:
>
>> Hi,
>>
>> 感谢Jark发起这个话题的讨论,这个功能对于Flink SQL来讲是一个非常重要的扩展。
>>
>> 问卷已填,再此再提几个小想法:
>> 1. 希望对接binlog时可以有全量初始化的能力,这样在Flink中我们就有了一个全表的实时状态,方便其他表与之进行join。
>> 2. 希望能够自动生成watermark,这样子可以尽可能的减少接入成本。因为有些场景是其他的append
>> log数据可以跟实时维护的表进行join;也有些场景是两个binlog形成的动态表互相join。
>> 3. 希望可以把binlog以state的形式存储在flink里,除了第一次启动需要全量加载,后续的运维都可以再此基础上只接收增量即可。
>> 4. 如此之外,如果能有schema变更感知能力是最好的。(当然这个可能很难体现在SQL里面,毕竟SQL作业在启动时就已经确定了table
>> 的schema)
>> 5.
>>
>> 最后一点,感觉不太符合flink现在的定位,但是可能会有用户会这样来使用。就是直接把flink作为一个数据同步工具,消费binlog,直接同步到其他存储里面。(可能基本不需要做任何加工的那种,而且最好是能够有自动感知schema变更,同时可以变更下游的存储系统的schema)
>>
>> Jark Wu  于2020年3月11日周三 下午3:00写道:
>>
>> > Hi, 大家好,
>> >
>> > Flink 社区目前正在对接一些 CDC (Change Data Capture) 工具,以期在下个版本能支持读取和处理常见的 binlog
>> > 数据,所以需要调研下大家目前主要使用的 CDC 工具是什么。
>> >
>> > 欢迎大家填下问卷调查,您的反馈对我们非常重要,谢谢!
>> >
>> > http://apacheflink.mikecrm.com/wDivVQ1
>> >
>> > 也欢迎大家在这个邮件下讨论关于 Flink 对接 CDC 的一些想法、需求、期望。
>> >
>> > Best,
>> > Jark
>> >
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>


flink sql ????time window join ????

2020-04-03 文章 claylin
hi all??flink sqlwindow join

flink sql ????time window join ????

2020-04-03 文章 claylin
hi all??flink sqlwindow join

回复:keyby的乱序处理

2020-04-03 文章 Sun.Zhu
1.未keyby的话,user1 user2 
user3的顺序取决于分区策略,比如forward他们还是会在一个subtask上,顺序还是有序的,如果被打散的话就不确定了
2.keyby的话,可以保证同一个key的后续数据保持有序,不同的key不能保证一定有序




| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年03月31日 15:39,tingli ke 写道:
HI,再次补充一下我的场景,如下图所示:
1、kafka TopicA的Partiton1的数据包含3个user的数据
2、flink在对该分区生成了w1、w2、w3...的watermark


问题来了:
1、w1、w2、w3...的watermark只能保证user1、user2、user3的整体数据的有序处理对吗?

2、在对user1、user2、user3进行keyby后,w1、w2、w3...的watermark能保证user1或者user2或者user3的有序处理吗?


期待大神的回复!




jun su  于2020年3月31日周二 下午1:10写道:

hi,
keyby后的watermark应该是上游多个线程中最小的watermark , 所以数据虽然可能乱序, 但是watermark并不会乱,
不会影响后续的窗口触发

tingli ke  于2020年3月31日周二 上午9:54写道:

> 您好,
> 针对您的回复,现在的场景是这样子的
> 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton
> 发射 watermark;
> 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗?
> 3、是否存在可以不用经过第一个前提的方案,直接在keyby之后设置watermark?
>
> Jimmy Wong  于2020年3月30日周一 下午9:13写道:
>
> > Hi,
> > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy
> > 或其他分配策略,可能导致数据更大的延迟(EventTime)。
> >
> >
> > “想做key化的乱序处理” 这句没太理解,麻烦解释下。
> >
> >
> > | |
> > Jimmy Wong
> > |
> > |
> > wangzmk...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年03月30日 20:58,tingli ke 写道:
> > 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗
> >
>


--
Best,
Jun Su


Re: 关于使用RocksDBStateBackend TTL 配置的问题

2020-04-03 文章 Yun Tang
Hi

只是配置state.backend.rocksdb.ttl.compaction.filter.enabled 还需要相关的state 
descriptor也配置上state ttl config,不确定这里所谓的“不理想”的效果是没有及时删除,还是彻底没有删除?

目前RocksDB的后台清理确实需要依赖于compaction的执行,换言之,如果有部分数据一直没有进入compaction,确实存在理论上的可能性一直不会因为过期而删除,但是这个可能性很低不应该对你的使用体验带来很大的影响。

现在的这两种策略是更新时间戳的策略,只要不再访问,到时间都会因为TTL而自动后台清除的。

定时任务去清除过期state这种思路相比目前的实现会影响性能,而且还需要谨慎的一点是目前Flink的访问模式都是单线程的,要避免多线程同时访问。



祝好
唐云

​

From: yanggang_it_job 
Sent: Friday, April 3, 2020 11:14
To: user-zh@flink.apache.org 
Subject: 关于使用RocksDBStateBackend TTL 配置的问题

Hi:
   
我们现在启用state.backend.rocksdb.ttl.compaction.filter.enabled进行rocksdb的有效期设置,但效果并不是那么理想。
   同时我也有以下问题想不明白:
   1、如果rocksdb在compact的时候有些state并没有被compact到,是否就意味着就算这些state已经过期也不会被删除?
   2、目前flink的ttl策略只有OnCreateAndWrite和OnReadAndWrite两种策略,是否有那种不需要刷新,到了TTL时间就自动清除。
 否则就会出现state一直在刷新导致永远无法删除,最终导致磁盘打满


   目前我能想到的方案是,另外写一个定时任务根据配置去清除过期state。
   请问大家还有其他更好的方案吗?