Re:Re: 如何按比例丢弃kafka中消费的数据

2022-03-12 Thread chenkaibit



我们搞过一个 connector,在消费的 kafka 数据积压(流量高峰)的时候丢弃某个时间或某个 offset 
之前的数据,尽量消费比较新的数据;不同的丢弃策略有不同的方案,像这种比较定制化的场景可能得自己实现了




--

Best, yuchuan





在 2022-03-12 17:45:59,"史延朋"  写道:
>另外,如果只是某些时刻数据比较多,实现connector侧的限流会不会好一些,另外是否可以考虑建一个关于connector侧限流相关的issue讨论下(不确定是否已经有相关的讨论)


YarnTaskExecutorRunner should contains MapReduce classes

2021-07-23 Thread chenkaibit



Hi:
I followed instructions described in 
[https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive] 
and tested hive streaming sink, met this exception  




  Caused by: java.lang.ClassNotFoundException: 
org.apache.hadoop.mapred.JobConf




[http://apache-flink.147419.n8.nabble.com/Flink-td7866.html] met the same 
problem.







I checked TM jvm envs and the code and found that flink only set up 
YARN_APPLICATION_CLASSPATH, but without MAPREDUCE_APPLICATION_CLASSPATH.




See: 
[https://github.com/apache/flink/blob/ed39fb2efc790af038c1babd4a48847b7b39f91e/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java#L119]

 




I think we should add MAPREDUCE_APPLICATION_CLASSPATH as well, as the same as 
spark does.



I created https://issues.apache.org/jira/browse/FLINK-23449 for this and hope 
someone could review.

flink job will restart over and over again if a taskmanager's disk damages

2020-10-21 Thread chenkaibit
Hi everyone:
 I met this Exception when a hard disk was damaged:
https://issues.apache.org/jira/secure/attachment/13009035/13009035_flink_disk_error.png


I checked the code and found that flink will create a temp file  when Record 
length > 5 MB:
// SpillingAdaptiveSpanningRecordDeserializer.java
if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
   // create a spilling channel and put the data there
this.spillingChannel = createSpillingChannel();

   ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
   FileUtils.writeCompletely(this.spillingChannel, toWrite);
}


The tempDir is random picked from all `tempDirs`. Well on yarn mode, one 
`tempDir`  usually represents one hard disk.
In may opinion, if a hard disk is damaged, taskmanager should pick another 
disk(tmpDir) for Spilling Channel, rather than throw an IOException, which 
causes flink job restart over and over again.


I have created a jira issue ( https://issues.apache.org/jira/browse/FLINK-18811 
 )  to track this. And I'm looking forward someone could help review the code 
or discuss about this issue. 
thanks!

flink-1.11 native k8s state 在 ceph 存储问题

2020-09-07 Thread chenkaibit
HI:
想问下大家在 native k8s 模式下有没有用到 ceph 存储 flink state 和 HA 数据(替代原来的 HDFS). 我在测试中发现 
flink on native k8s 下没有办法挂载 cephfs,这一块是不是需要修改代码才能支持?我在 flink jira 上发现一个相关 
issue(https://issues.apache.org/jira/browse/FLINK-15649),在这里想请教下 flink-1.11 
版本如何挂载 cephfs;如果不能挂载 cephfs 的话,还能使用除 hdfs 之外的哪些存储。
求大神解惑。

--

Best, yuchuan

flink-1.11 使用 application 模式时 jobid 问题

2020-09-02 Thread chenkaibit
hi:
 我在测试 flink-1.11 application 模式时发现 开启 HA 后 jobID 总是 
;关闭 HA 后是个随机字符(和之前版本相同)。这个是个 bug 还是就是这么设计的?
 求大神解答。

--

Best, yuchuan

Re:Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据

2020-09-02 Thread chenkaibit
hi:
 你可以尝试下 prometheus-pushgateway-cleaner,支持 docker 运行

 https://github.com/jorinvo/prometheus-pushgateway-cleaner

 可以定时清理指定时间未更新的 pushgateway metric,希望可以帮到你




--

Best, yuchuan





在 2020-09-02 11:25:25,"Jim Chen"  写道:
>能不能过restful api拿到pushgateway中metric上报的时间?通过这个时间动态判断删除group
>
>xiao cai  于2020年9月1日周二 下午8:52写道:
>
>> Hi shizk233:
>> 我这边也复现了你说的情况,一模一样。
>>
>>  
>> 可以尝试使用定时调度任务检查flink任务的执行情况,当不再处于运行状态时,主动调用pushgateway的delete方法来删除pushgetway的metrics。
>>
>>
>>
>>
>>  原始邮件
>> 发件人: shizk233
>> 收件人: user-zh@flink.apache.org
>> 发送时间: 2020年9月1日(周二) 19:10
>> 主题: Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据
>>
>>
>> Hi Xiao, 我这边实践过程中发现,该参数只能删除jobmanager对应的metrics group,不能删除tm的。
>> 我们开启了randomJobNameSuffix,该参数会让JM和TM的metrics信息分属不同metrics group。
>> 感觉这可能是一个bug? xiao cai  于2020年9月1日周二 下午4:57写道: > Hi: >
>> 可以试试在flink-conf.yaml中添加: > metrics.reporter.promgateway.deleteOnShutdown:
>> true > > > Best, > Xiao > 原始邮件 > 发件人: bradyMk > 收件人:
>> user-zh > 发送时间: 2020年9月1日(周二) 16:50 > 主题: Re:
>> 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据 > > >
>> 您好,我不是很懂您的意思,例如我这边有一个这样的指标:flink_jobmanager_job_uptime 监控一个任务的运行时长; >
>> 如果该任务被kill掉,那么这个指标的数值会变成一个不变的量,一直显示在grafana中。我不太会promeQL,我尝试这样: >
>> flink_jobmanager_job_uptime[1m],这样是个非法查询命令,按照您的意思,应该怎么改呢? - Best Wishes
>> > -- Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Flink 1.10 on Yarn

2020-08-06 Thread chenkaibit
hi xuhaiLong,看日志发生的 checkpoint nullpointer 是个已知的问题,具体可以查看下面两个jira。
你用的jdk版本是多少呢?目前发现使用 jdk8_40/jdk8_60 + flink-1.10 会出现 checkpoint 
nullpointer,可以把jdk升级下版本试一下
https://issues.apache.org/jira/browse/FLINK-18196
https://issues.apache.org/jira/browse/FLINK-17479




在 2020-08-07 12:50:23,"xuhaiLong"  写道:

sorry,我添加错附件了


是的,taskmanager.memory.jvm-metaspace.size 为默认配置
On 8/7/2020 11:43,Yangze Guo wrote:
日志没有贴成功,taskmanager.memory.jvm-metaspace.size目前是默认配置么?

Best,
Yangze Guo

On Fri, Aug 7, 2020 at 11:38 AM xuhaiLong  wrote:



Hi


场景:1 tm 三个slot,run了三个job


三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现 
`java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has 
occurred. This can mean two things: either the job requires a larger size of 
JVM metaspace to load classes or there is a class loading leak. In the first 
case 'taskmanager.memory.jvm-metaspace.size' configuration option should be 
increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak which has to be 
investigated and fixed. The task executor has to be shutdown...
`


附件为部分异常信息


疑问:
1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题)
2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启?


感谢~~~
从网易邮箱大师发来的云附件
08-07error.txt(730.4KB,2020年8月22日 11:37 到期)
下载


Re:Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-06 Thread chenkaibit
hi,Congxian。我在发现这个问题时也很奇怪,但是在打印了一些日志后,确实验证了我的想法。因为 <低版本jdk+flink1.9> 和 
<高版本jdk+1.10> 都不会抛 NPE(见 FLINK-17479),我猜测和 lambda 表达式中外部变量的垃圾回收机制以及 1.10 引入的 
MailBox 模型有关,外部 checkpointMetaData 实例被意外回收了。所以在修复的 patch 中我在 lambda 
表达式内部实例化了一个新的 checkpointMetaData,目前看这个方法是有效的,没有再发现过 
NPE。这是个临时的修复方法,根本原因可能还需要进一步分析。




--

Best, yuchuan





在 2020-07-06 14:04:58,"Congxian Qiu"  写道:
>@陈凯 感谢你分享的这个方法,比较好奇这两个的区别是什么?修改后的 patch 在 closure 中一开始 copy 了一份
>CheckpointMeta,也就是说 845 - 867 行之间,之前的 checkpointMeta 会变为 null,这个比较奇怪。
>
>Best,
>Congxian
>
>
>陈凯  于2020年7月6日周一 上午9:53写道:
>
>>
>> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
>> 我之前提了个jira 描述了这个问题
>> https://issues.apache.org/jira/browse/FLINK-18196
>>
>> 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch:
>>
>> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
>>
>>
>>
>> -邮件原件-
>> 发件人: zhisheng 
>> 发送时间: 2020年7月5日 15:01
>> 收件人: user-zh 
>> 主题: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常
>>
>> 生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian
>>
>> Best!
>> zhisheng
>>
>> Congxian Qiu  于2020年7月4日周六 下午3:21写道:
>>
>> > @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > zhisheng  于2020年7月4日周六 下午12:27写道:
>> >
>> > > 我们也有遇到过这个异常,但是不是很常见
>> > >
>> > > Congxian Qiu  于2020年7月3日周五 下午2:08写道:
>> > >
>> > > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
>> > > > [1]  https://issues.apache.org/jira/browse/FLINK-17479
>> > > > Best,
>> > > > Congxian
>> > > >
>> > > >
>> > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
>> > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
>> > > > > >你到具体的tm上找到相关的operator看看是不是有异常信息
>> > > > > >
>> > > > > >
>> > > > > >| |
>> > > > > >JasonLee
>> > > > > >|
>> > > > > >|
>> > > > > >邮箱:17610775...@163.com
>> > > > > >|
>> > > > > >
>> > > > > >Signature is customized by Netease Mail Master
>> > > > > >
>> > > > > >在2020年07月01日 20:43,程龙 写道:
>> > > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
>> > > > > >
>> > > > > >
>> > > > > >java.lang.Exception: Could not perform checkpoint 3201 for
>> operator
>> > > > > Filter -> Map (2/8).
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > >
>> > > >
>> > >
>> >
>> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > >
>> > > >
>> > >
>> >
>> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
>> > > > > >   at org.apache.flink.streaming.runtime.io
>> > > > >
>> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
>> > > > > >   at
>> > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>> > > > > >   at
>> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>> > > > > >   at java.lang.Thread.run(Thread.java:745)
>> > > > > >Caused by: java.lang.NullPointerException
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> > > > > >   at
>> > > > >
>> > > >
>> > >
>> >
>> 

Re:flink 1.10SQL 报错问题求教

2020-06-08 Thread chenkaibit
我也遇到过这个问题,这个可能是 checkpoint 的 bug,我修改了下 flink 源码后不报错了,可以参考下这个patch  
https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19


在 2020-06-05 15:06:48,"hb" <343122...@163.com> 写道:
>Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
>但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环.
>哪位帮忙看看,不胜感激.
>
>
>2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task - 
>Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> 
>SourceConversion(table=[default_catalog.default_database.user_visit_trace, 
>source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], 
>fields=[userId, utp, utrp, extendFields, requestTime]) -> 
>Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], 
>where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
>(((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'MMdd') > 
>_UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT 
>NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], 
>joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id 
>IS NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> 
>Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) 
>FROM_UNIXTIME _UTF-16LE'MMdd') CONCAT 
>_UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' 
>CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> 
>SinkConversionToTuple2 -> Sink: Unnamed (1/8) 
>(ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED.
>java.lang.Exception: Could not perform checkpoint 401 for operator Source: 
>KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> 
>SourceConversion(table=[default_catalog.default_database.user_visit_trace, 
>source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], 
>fields=[userId, utp, utrp, extendFields, requestTime]) -> 
>Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], 
>where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
>(((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'MMdd') > 
>_UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT 
>NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], 
>joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id 
>IS NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> 
>Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) 
>FROM_UNIXTIME _UTF-16LE'MMdd') CONCAT 
>_UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' 
>CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> 
>SinkConversionToTuple2 -> Sink: Unnamed (1/8).
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown
>  Source)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
>at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>at java.lang.Thread.run(Thread.java:745)
>Caused by: java.lang.NullPointerException
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown
>  Source)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
>... 12 more
>2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task - 
>Attempting to fail task externally 

Re:Re:Re: flink-1.10 checkpoint 偶尔报 NullPointerException

2020-05-08 Thread chenkaibit
Hi:
加了一些日志后发现是 checkpointMetaData 为 NULL 了 
https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1421
测试程序为读 kafka,然后进行 wordcount,结果写入 kafka。checkpoint 配置如下:
| Checkpointing Mode | Exactly Once |
| Interval | 5s |
| Timeout | 10m 0s |
| Minimum Pause Between Checkpoints | 0ms |
| Maximum Concurrent Checkpoints | 1 |


稳定在第 5377 个 checkpoint 抛出 NPE


虽然原因还不清楚,但是修改了部分代码(见 
https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19)后不再出现
 NPE 了。


在 2020-04-21 10:21:56,"chenkaibit"  写道:
>
>
>
>这个不是稳定复现的,但是在最近 1.10 上测试的几个作业出现了,触发时也没有其他报错。我加了一些日志,再观察下
>
>
>
>
>在 2020-04-21 01:12:48,"Yun Tang"  写道:
>>Hi
>>
>>这个NPE有点奇怪,从executeCheckpointing方法[1]里面其实比较难定位究竟是哪一个变量或者变量的取值是null。
>>一种排查思路是打开 org.apache.flink.streaming.runtime.tasks 的DEBUG 
>>level日志,通过debug日志缩小范围,判断哪个变量是null
>>
>>这个异常出现的时候,相关task上面的日志有什么异常么,触发这个NPE的条件是什么,稳定复现么?
>>
>>[1] 
>>https://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1349
>>
>>祝好
>>唐云
>>
>>
>>From: chenkaibit 
>>Sent: Monday, April 20, 2020 18:39
>>To: user-zh@flink.apache.org 
>>Subject: flink-1.10 checkpoint 偶尔报 NullPointerException
>>
>>大家遇到过这个错误吗, CheckpointOperation.executeCheckpointing 的时候报 NullPointerException
>>java.lang.Exception: Couldnot perform checkpoint 5505for operator Source: 
>>KafkaTableSource(xxx) -> SourceConversion(table=[xxx, source: 
>>[KafkaTableSource(xxx)]], fields=[xxx]) -> Calc(select=[xxx) AS xxx]) -> 
>>SinkConversionToTuple2 -> Sink: Elasticsearch6UpsertTableSink(xxx) (1/1).
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$228/1024478318.call(UnknownSource)
>>
>>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
>>
>>at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>>
>>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>
>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>
>>at java.lang.Thread.run(Thread.java:745)
>>
>>Causedby: java.lang.NullPointerException
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$229/1010499540.run(UnknownSource)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
>>
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
>>
>>... 12 more


Re:Re: flink-1.10 checkpoint 偶尔报 NullPointerException

2020-04-20 Thread chenkaibit



这个不是稳定复现的,但是在最近 1.10 上测试的几个作业出现了,触发时也没有其他报错。我加了一些日志,再观察下




在 2020-04-21 01:12:48,"Yun Tang"  写道:
>Hi
>
>这个NPE有点奇怪,从executeCheckpointing方法[1]里面其实比较难定位究竟是哪一个变量或者变量的取值是null。
>一种排查思路是打开 org.apache.flink.streaming.runtime.tasks 的DEBUG 
>level日志,通过debug日志缩小范围,判断哪个变量是null
>
>这个异常出现的时候,相关task上面的日志有什么异常么,触发这个NPE的条件是什么,稳定复现么?
>
>[1] 
>https://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1349
>
>祝好
>唐云
>
>
>From: chenkaibit 
>Sent: Monday, April 20, 2020 18:39
>To: user-zh@flink.apache.org 
>Subject: flink-1.10 checkpoint 偶尔报 NullPointerException
>
>大家遇到过这个错误吗, CheckpointOperation.executeCheckpointing 的时候报 NullPointerException
>java.lang.Exception: Couldnot perform checkpoint 5505for operator Source: 
>KafkaTableSource(xxx) -> SourceConversion(table=[xxx, source: 
>[KafkaTableSource(xxx)]], fields=[xxx]) -> Calc(select=[xxx) AS xxx]) -> 
>SinkConversionToTuple2 -> Sink: Elasticsearch6UpsertTableSink(xxx) (1/1).
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$228/1024478318.call(UnknownSource)
>
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
>
>at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>
>at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>
>at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>
>at java.lang.Thread.run(Thread.java:745)
>
>Causedby: java.lang.NullPointerException
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$229/1010499540.run(UnknownSource)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
>
>... 12 more


flink-1.10 checkpoint 偶尔报 NullPointerException

2020-04-20 Thread chenkaibit
大家遇到过这个错误吗, CheckpointOperation.executeCheckpointing 的时候报 NullPointerException 
java.lang.Exception: Couldnot perform checkpoint 5505for operator Source: 
KafkaTableSource(xxx) -> SourceConversion(table=[xxx, source: 
[KafkaTableSource(xxx)]], fields=[xxx]) -> Calc(select=[xxx) AS xxx]) -> 
SinkConversionToTuple2 -> Sink: Elasticsearch6UpsertTableSink(xxx) (1/1).

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$228/1024478318.call(UnknownSource)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)

at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:745)

Causedby: java.lang.NullPointerException

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$229/1010499540.run(UnknownSource)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)

... 12 more

flink HiveTableSink 何时支持 streaming 模式写入

2020-03-10 Thread chenkaibit
Hi:
我看 https://issues.apache.org/jira/browse/FLINK-14255 引入了 一个 
FileSystemStreamingSink,貌似是为 HiveTableSink 支持 streaming 
模式写入做准备,这个功能预计会在后续哪个版本正式发布呢?