Re:Re: 如何按比例丢弃kafka中消费的数据
我们搞过一个 connector,在消费的 kafka 数据积压(流量高峰)的时候丢弃某个时间或某个 offset 之前的数据,尽量消费比较新的数据;不同的丢弃策略有不同的方案,像这种比较定制化的场景可能得自己实现了 -- Best, yuchuan 在 2022-03-12 17:45:59,"史延朋" 写道: >另外,如果只是某些时刻数据比较多,实现connector侧的限流会不会好一些,另外是否可以考虑建一个关于connector侧限流相关的issue讨论下(不确定是否已经有相关的讨论)
YarnTaskExecutorRunner should contains MapReduce classes
Hi: I followed instructions described in [https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive] and tested hive streaming sink, met this exception Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf [http://apache-flink.147419.n8.nabble.com/Flink-td7866.html] met the same problem. I checked TM jvm envs and the code and found that flink only set up YARN_APPLICATION_CLASSPATH, but without MAPREDUCE_APPLICATION_CLASSPATH. See: [https://github.com/apache/flink/blob/ed39fb2efc790af038c1babd4a48847b7b39f91e/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java#L119] I think we should add MAPREDUCE_APPLICATION_CLASSPATH as well, as the same as spark does. I created https://issues.apache.org/jira/browse/FLINK-23449 for this and hope someone could review.
flink job will restart over and over again if a taskmanager's disk damages
Hi everyone: I met this Exception when a hard disk was damaged: https://issues.apache.org/jira/secure/attachment/13009035/13009035_flink_disk_error.png I checked the code and found that flink will create a temp file when Record length > 5 MB: // SpillingAdaptiveSpanningRecordDeserializer.java if (nextRecordLength > THRESHOLD_FOR_SPILLING) { // create a spilling channel and put the data there this.spillingChannel = createSpillingChannel(); ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk); FileUtils.writeCompletely(this.spillingChannel, toWrite); } The tempDir is random picked from all `tempDirs`. Well on yarn mode, one `tempDir` usually represents one hard disk. In may opinion, if a hard disk is damaged, taskmanager should pick another disk(tmpDir) for Spilling Channel, rather than throw an IOException, which causes flink job restart over and over again. I have created a jira issue ( https://issues.apache.org/jira/browse/FLINK-18811 ) to track this. And I'm looking forward someone could help review the code or discuss about this issue. thanks!
flink-1.11 native k8s state 在 ceph 存储问题
HI: 想问下大家在 native k8s 模式下有没有用到 ceph 存储 flink state 和 HA 数据(替代原来的 HDFS). 我在测试中发现 flink on native k8s 下没有办法挂载 cephfs,这一块是不是需要修改代码才能支持?我在 flink jira 上发现一个相关 issue(https://issues.apache.org/jira/browse/FLINK-15649),在这里想请教下 flink-1.11 版本如何挂载 cephfs;如果不能挂载 cephfs 的话,还能使用除 hdfs 之外的哪些存储。 求大神解惑。 -- Best, yuchuan
flink-1.11 使用 application 模式时 jobid 问题
hi: 我在测试 flink-1.11 application 模式时发现 开启 HA 后 jobID 总是 ;关闭 HA 后是个随机字符(和之前版本相同)。这个是个 bug 还是就是这么设计的? 求大神解答。 -- Best, yuchuan
Re:Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据
hi: 你可以尝试下 prometheus-pushgateway-cleaner,支持 docker 运行 https://github.com/jorinvo/prometheus-pushgateway-cleaner 可以定时清理指定时间未更新的 pushgateway metric,希望可以帮到你 -- Best, yuchuan 在 2020-09-02 11:25:25,"Jim Chen" 写道: >能不能过restful api拿到pushgateway中metric上报的时间?通过这个时间动态判断删除group > >xiao cai 于2020年9月1日周二 下午8:52写道: > >> Hi shizk233: >> 我这边也复现了你说的情况,一模一样。 >> >> >> 可以尝试使用定时调度任务检查flink任务的执行情况,当不再处于运行状态时,主动调用pushgateway的delete方法来删除pushgetway的metrics。 >> >> >> >> >> 原始邮件 >> 发件人: shizk233 >> 收件人: user-zh@flink.apache.org >> 发送时间: 2020年9月1日(周二) 19:10 >> 主题: Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据 >> >> >> Hi Xiao, 我这边实践过程中发现,该参数只能删除jobmanager对应的metrics group,不能删除tm的。 >> 我们开启了randomJobNameSuffix,该参数会让JM和TM的metrics信息分属不同metrics group。 >> 感觉这可能是一个bug? xiao cai 于2020年9月1日周二 下午4:57写道: > Hi: > >> 可以试试在flink-conf.yaml中添加: > metrics.reporter.promgateway.deleteOnShutdown: >> true > > > Best, > Xiao > 原始邮件 > 发件人: bradyMk > 收件人: >> user-zh > 发送时间: 2020年9月1日(周二) 16:50 > 主题: Re: >> 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据 > > > >> 您好,我不是很懂您的意思,例如我这边有一个这样的指标:flink_jobmanager_job_uptime 监控一个任务的运行时长; > >> 如果该任务被kill掉,那么这个指标的数值会变成一个不变的量,一直显示在grafana中。我不太会promeQL,我尝试这样: > >> flink_jobmanager_job_uptime[1m],这样是个非法查询命令,按照您的意思,应该怎么改呢? - Best Wishes >> > -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re:Re: Flink 1.10 on Yarn
hi xuhaiLong,看日志发生的 checkpoint nullpointer 是个已知的问题,具体可以查看下面两个jira。 你用的jdk版本是多少呢?目前发现使用 jdk8_40/jdk8_60 + flink-1.10 会出现 checkpoint nullpointer,可以把jdk升级下版本试一下 https://issues.apache.org/jira/browse/FLINK-18196 https://issues.apache.org/jira/browse/FLINK-17479 在 2020-08-07 12:50:23,"xuhaiLong" 写道: sorry,我添加错附件了 是的,taskmanager.memory.jvm-metaspace.size 为默认配置 On 8/7/2020 11:43,Yangze Guo wrote: 日志没有贴成功,taskmanager.memory.jvm-metaspace.size目前是默认配置么? Best, Yangze Guo On Fri, Aug 7, 2020 at 11:38 AM xuhaiLong wrote: Hi 场景:1 tm 三个slot,run了三个job 三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现 `java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case 'taskmanager.memory.jvm-metaspace.size' configuration option should be increased. If the error persists (usually in cluster after several job (re-)submissions) then there is probably a class loading leak which has to be investigated and fixed. The task executor has to be shutdown... ` 附件为部分异常信息 疑问: 1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题) 2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启? 感谢~~~ 从网易邮箱大师发来的云附件 08-07error.txt(730.4KB,2020年8月22日 11:37 到期) 下载
Re:Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常
hi,Congxian。我在发现这个问题时也很奇怪,但是在打印了一些日志后,确实验证了我的想法。因为 <低版本jdk+flink1.9> 和 <高版本jdk+1.10> 都不会抛 NPE(见 FLINK-17479),我猜测和 lambda 表达式中外部变量的垃圾回收机制以及 1.10 引入的 MailBox 模型有关,外部 checkpointMetaData 实例被意外回收了。所以在修复的 patch 中我在 lambda 表达式内部实例化了一个新的 checkpointMetaData,目前看这个方法是有效的,没有再发现过 NPE。这是个临时的修复方法,根本原因可能还需要进一步分析。 -- Best, yuchuan 在 2020-07-06 14:04:58,"Congxian Qiu" 写道: >@陈凯 感谢你分享的这个方法,比较好奇这两个的区别是什么?修改后的 patch 在 closure 中一开始 copy 了一份 >CheckpointMeta,也就是说 845 - 867 行之间,之前的 checkpointMeta 会变为 null,这个比较奇怪。 > >Best, >Congxian > > >陈凯 于2020年7月6日周一 上午9:53写道: > >> >> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。 >> 我之前提了个jira 描述了这个问题 >> https://issues.apache.org/jira/browse/FLINK-18196 >> >> 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch: >> >> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19 >> >> >> >> -邮件原件- >> 发件人: zhisheng >> 发送时间: 2020年7月5日 15:01 >> 收件人: user-zh >> 主题: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常 >> >> 生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian >> >> Best! >> zhisheng >> >> Congxian Qiu 于2020年7月4日周六 下午3:21写道: >> >> > @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢? >> > >> > Best, >> > Congxian >> > >> > >> > zhisheng 于2020年7月4日周六 下午12:27写道: >> > >> > > 我们也有遇到过这个异常,但是不是很常见 >> > > >> > > Congxian Qiu 于2020年7月3日周五 下午2:08写道: >> > > >> > > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试 >> > > > [1] https://issues.apache.org/jira/browse/FLINK-17479 >> > > > Best, >> > > > Congxian >> > > > >> > > > >> > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道: >> > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空 >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道: >> > > > > >你到具体的tm上找到相关的operator看看是不是有异常信息 >> > > > > > >> > > > > > >> > > > > >| | >> > > > > >JasonLee >> > > > > >| >> > > > > >| >> > > > > >邮箱:17610775...@163.com >> > > > > >| >> > > > > > >> > > > > >Signature is customized by Netease Mail Master >> > > > > > >> > > > > >在2020年07月01日 20:43,程龙 写道: >> > > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下: >> > > > > > >> > > > > > >> > > > > >java.lang.Exception: Could not perform checkpoint 3201 for >> operator >> > > > > Filter -> Map (2/8). >> > > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816) >> > > > > > at org.apache.flink.streaming.runtime.io >> > > > > >> > > > >> > > >> > >> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86) >> > > > > > at org.apache.flink.streaming.runtime.io >> > > > > >> > > > >> > > >> > >> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) >> > > > > > at org.apache.flink.streaming.runtime.io >> > > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) >> > > > > > at org.apache.flink.streaming.runtime.io >> > > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133) >> > > > > > at org.apache.flink.streaming.runtime.io >> > > > > >> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) >> > > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310) >> > > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) >> > > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) >> > > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) >> > > > > > at >> > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) >> > > > > > at >> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) >> > > > > > at java.lang.Thread.run(Thread.java:745) >> > > > > >Caused by: java.lang.NullPointerException >> > > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382) >> > > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974) >> > > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870) >> > > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> > > > > > at >> > > > > >> > > > >> > > >> > >>
Re:flink 1.10SQL 报错问题求教
我也遇到过这个问题,这个可能是 checkpoint 的 bug,我修改了下 flink 源码后不报错了,可以参考下这个patch https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19 在 2020-06-05 15:06:48,"hb" <343122...@163.com> 写道: >Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB), >但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环. >哪位帮忙看看,不胜感激. > > >2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task - >Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> >SourceConversion(table=[default_catalog.default_database.user_visit_trace, >source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], >fields=[userId, utp, utrp, extendFields, requestTime]) -> >Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], >where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND >(((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'MMdd') > >_UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT >NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], >joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id >IS NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> >Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) >FROM_UNIXTIME _UTF-16LE'MMdd') CONCAT >_UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' >CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> >SinkConversionToTuple2 -> Sink: Unnamed (1/8) >(ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED. >java.lang.Exception: Could not perform checkpoint 401 for operator Source: >KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> >SourceConversion(table=[default_catalog.default_database.user_visit_trace, >source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], >fields=[userId, utp, utrp, extendFields, requestTime]) -> >Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], >where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND >(((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'MMdd') > >_UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT >NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], >joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id >IS NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> >Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) >FROM_UNIXTIME _UTF-16LE'MMdd') CONCAT >_UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' >CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> >SinkConversionToTuple2 -> Sink: Unnamed (1/8). >at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802) >at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777) >at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown > Source) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) >at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) >at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) >at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) >at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >at java.lang.Thread.run(Thread.java:745) >Caused by: java.lang.NullPointerException >at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411) >at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991) >at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887) >at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown > Source) >at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860) >at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793) >... 12 more >2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task - >Attempting to fail task externally
Re:Re:Re: flink-1.10 checkpoint 偶尔报 NullPointerException
Hi: 加了一些日志后发现是 checkpointMetaData 为 NULL 了 https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1421 测试程序为读 kafka,然后进行 wordcount,结果写入 kafka。checkpoint 配置如下: | Checkpointing Mode | Exactly Once | | Interval | 5s | | Timeout | 10m 0s | | Minimum Pause Between Checkpoints | 0ms | | Maximum Concurrent Checkpoints | 1 | 稳定在第 5377 个 checkpoint 抛出 NPE 虽然原因还不清楚,但是修改了部分代码(见 https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19)后不再出现 NPE 了。 在 2020-04-21 10:21:56,"chenkaibit" 写道: > > > >这个不是稳定复现的,但是在最近 1.10 上测试的几个作业出现了,触发时也没有其他报错。我加了一些日志,再观察下 > > > > >在 2020-04-21 01:12:48,"Yun Tang" 写道: >>Hi >> >>这个NPE有点奇怪,从executeCheckpointing方法[1]里面其实比较难定位究竟是哪一个变量或者变量的取值是null。 >>一种排查思路是打开 org.apache.flink.streaming.runtime.tasks 的DEBUG >>level日志,通过debug日志缩小范围,判断哪个变量是null >> >>这个异常出现的时候,相关task上面的日志有什么异常么,触发这个NPE的条件是什么,稳定复现么? >> >>[1] >>https://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1349 >> >>祝好 >>唐云 >> >> >>From: chenkaibit >>Sent: Monday, April 20, 2020 18:39 >>To: user-zh@flink.apache.org >>Subject: flink-1.10 checkpoint 偶尔报 NullPointerException >> >>大家遇到过这个错误吗, CheckpointOperation.executeCheckpointing 的时候报 NullPointerException >>java.lang.Exception: Couldnot perform checkpoint 5505for operator Source: >>KafkaTableSource(xxx) -> SourceConversion(table=[xxx, source: >>[KafkaTableSource(xxx)]], fields=[xxx]) -> Calc(select=[xxx) AS xxx]) -> >>SinkConversionToTuple2 -> Sink: Elasticsearch6UpsertTableSink(xxx) (1/1). >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802) >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777) >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$228/1024478318.call(UnknownSource) >> >>at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) >> >>at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) >> >>at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) >> >>at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) >> >>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> >>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> >>at java.lang.Thread.run(Thread.java:745) >> >>Causedby: java.lang.NullPointerException >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411) >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991) >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887) >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$229/1010499540.run(UnknownSource) >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860) >> >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793) >> >>... 12 more
Re:Re: flink-1.10 checkpoint 偶尔报 NullPointerException
这个不是稳定复现的,但是在最近 1.10 上测试的几个作业出现了,触发时也没有其他报错。我加了一些日志,再观察下 在 2020-04-21 01:12:48,"Yun Tang" 写道: >Hi > >这个NPE有点奇怪,从executeCheckpointing方法[1]里面其实比较难定位究竟是哪一个变量或者变量的取值是null。 >一种排查思路是打开 org.apache.flink.streaming.runtime.tasks 的DEBUG >level日志,通过debug日志缩小范围,判断哪个变量是null > >这个异常出现的时候,相关task上面的日志有什么异常么,触发这个NPE的条件是什么,稳定复现么? > >[1] >https://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1349 > >祝好 >唐云 > > >From: chenkaibit >Sent: Monday, April 20, 2020 18:39 >To: user-zh@flink.apache.org >Subject: flink-1.10 checkpoint 偶尔报 NullPointerException > >大家遇到过这个错误吗, CheckpointOperation.executeCheckpointing 的时候报 NullPointerException >java.lang.Exception: Couldnot perform checkpoint 5505for operator Source: >KafkaTableSource(xxx) -> SourceConversion(table=[xxx, source: >[KafkaTableSource(xxx)]], fields=[xxx]) -> Calc(select=[xxx) AS xxx]) -> >SinkConversionToTuple2 -> Sink: Elasticsearch6UpsertTableSink(xxx) (1/1). > >at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802) > >at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777) > >at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$228/1024478318.call(UnknownSource) > >at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) > >at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) > >at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) > >at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) > >at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > >at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > >at java.lang.Thread.run(Thread.java:745) > >Causedby: java.lang.NullPointerException > >at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411) > >at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991) > >at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887) > >at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$229/1010499540.run(UnknownSource) > >at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > >at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860) > >at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793) > >... 12 more
flink-1.10 checkpoint 偶尔报 NullPointerException
大家遇到过这个错误吗, CheckpointOperation.executeCheckpointing 的时候报 NullPointerException java.lang.Exception: Couldnot perform checkpoint 5505for operator Source: KafkaTableSource(xxx) -> SourceConversion(table=[xxx, source: [KafkaTableSource(xxx)]], fields=[xxx]) -> Calc(select=[xxx) AS xxx]) -> SinkConversionToTuple2 -> Sink: Elasticsearch6UpsertTableSink(xxx) (1/1). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$228/1024478318.call(UnknownSource) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:745) Causedby: java.lang.NullPointerException at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$229/1010499540.run(UnknownSource) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793) ... 12 more
flink HiveTableSink 何时支持 streaming 模式写入
Hi: 我看 https://issues.apache.org/jira/browse/FLINK-14255 引入了 一个 FileSystemStreamingSink,貌似是为 HiveTableSink 支持 streaming 模式写入做准备,这个功能预计会在后续哪个版本正式发布呢?