Re: flink1.9.1-消费kafka落pg库任务出错

2020-07-14 文章 shizk233
Hi nicygan,

unable to create new native thread指的是无法创建checkpoint线程,并不是内存占用过大。
这种情况一般有3种可能的原因:
1.flink应用开启太多线程
2.机器上句柄设置太小
3.机器上的其他应用开启太多线程

建议排查一下机器上的ulimit设置(文件句柄会影响应用能开启的线程数),和flink metrics里监控到的线程数变化。

Best,
shizk233



nicygan  于2020年7月14日周二 上午10:31写道:

> dear all:
>
> 我有一个消费kafka数据写到pg库的任务,任务发生过重启,yarn日志显示jobmanager发生oom,但找不到具体原因,因为数据量非常小,按道理不该发生oom。
>   详细如下:
>
>
> 1、部署方式:
> flink on yarn ,pre-job,每个container 1024 M
> jobmanager的jvmoption(默认的)  -Xms424m-Xmx424m
>
>
> 2、数据情况:
> kafka数据,约1分钟1条,文本数据,每条数据都非常小。
>
>
> 3、任务情况:
> 很简单,消费kafka然后直接写到pg库,中间没有任何处理,没有自定义的状态。
> 消费采用 FlinkKafkaConsumer
> 写库采用 JDBCAppendTableSink
> 并行度 1
> checkpoint 2分钟一次,每次checkpoint约100ms
> statebackend rocksdb
>
>
> 4、报错情况:
> 2020-07-10 11:51:54,237 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 555 @ 1594353114226 for job cd5ceeedeb35e8e094991edf09233483.
> 2020-07-10 11:51:54,421 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 555 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 77
> ms).
> 2020-07-10 11:53:54,253 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 556 @ 1594353234226 for job cd5ceeedeb35e8e094991edf09233483.
> 2020-07-10 11:53:54,457 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 556 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 124
> ms).
> 2020-07-10 11:55:54,246 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 557 @ 1594353354226 for job cd5ceeedeb35e8e094991edf09233483.
> 2020-07-10 11:55:54,402 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 557 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 115
> ms).
> 2020-07-10 11:56:34,155 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
> Thread 'flink-akka.actor.default-dispatcher-4673' produced an uncaught
> exception. Stopping the process...
> java.lang.OutOfMemoryError: unable to create new native thread
> at java.lang.Thread.start0(Native Method)
> at java.lang.Thread.start(Thread.java:717)
> at
> akka.dispatch.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
> at
> akka.dispatch.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966)
> at
> akka.dispatch.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1905)
> at
> akka.dispatch.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834)
> at
> akka.dispatch.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool.execute(ForkJoinExecutorConfigurator.scala:30)
> at
> akka.dispatch.ExecutorServiceDelegate.execute(ThreadPoolBuilder.scala:211)
> at
> akka.dispatch.ExecutorServiceDelegate.execute$(ThreadPoolBuilder.scala:211)
> at
> akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:39)
> at
> akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:115)
> at akka.dispatch.Dispatcher.dispatch(Dispatcher.scala:55)
> at akka.actor.dungeon.Dispatch.sendMessage(Dispatch.scala:142)
> at akka.actor.dungeon.Dispatch.sendMessage$(Dispatch.scala:136)
> at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
> at akka.actor.Cell.sendMessage(ActorCell.scala:350)
> at akka.actor.Cell.sendMessage$(ActorCell.scala:349)
> at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
> at
> akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:173)
> at akka.actor.Scheduler$$anon$3.run(Scheduler.scala:171)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> thanks all / by nicygan
>


flink1.9.1-消费kafka落pg库任务出错

2020-07-13 文章 nicygan
dear all:
  
我有一个消费kafka数据写到pg库的任务,任务发生过重启,yarn日志显示jobmanager发生oom,但找不到具体原因,因为数据量非常小,按道理不该发生oom。
  详细如下:


1、部署方式:
flink on yarn ,pre-job,每个container 1024 M
jobmanager的jvmoption(默认的)  -Xms424m-Xmx424m


2、数据情况:
kafka数据,约1分钟1条,文本数据,每条数据都非常小。


3、任务情况:
很简单,消费kafka然后直接写到pg库,中间没有任何处理,没有自定义的状态。
消费采用 FlinkKafkaConsumer
写库采用 JDBCAppendTableSink
并行度 1
checkpoint 2分钟一次,每次checkpoint约100ms
statebackend rocksdb


4、报错情况:
2020-07-10 11:51:54,237 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 555 @ 1594353114226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:51:54,421 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 555 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 77 ms).
2020-07-10 11:53:54,253 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 556 @ 1594353234226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:53:54,457 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 556 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 124 ms).
2020-07-10 11:55:54,246 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 557 @ 1594353354226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:55:54,402 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 557 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 115 ms).
2020-07-10 11:56:34,155 ERROR 
org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL: Thread 
'flink-akka.actor.default-dispatcher-4673' produced an uncaught exception. 
Stopping the process...
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at 
akka.dispatch.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
at 
akka.dispatch.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966)
at 
akka.dispatch.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1905)
at 
akka.dispatch.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834)
at akka.dispatch.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool.execute(ForkJoinExecutorConfigurator.scala:30)
at 
akka.dispatch.ExecutorServiceDelegate.execute(ThreadPoolBuilder.scala:211)
at 
akka.dispatch.ExecutorServiceDelegate.execute$(ThreadPoolBuilder.scala:211)
at 
akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:39)
at akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:115)
at akka.dispatch.Dispatcher.dispatch(Dispatcher.scala:55)
at akka.actor.dungeon.Dispatch.sendMessage(Dispatch.scala:142)
at akka.actor.dungeon.Dispatch.sendMessage$(Dispatch.scala:136)
at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
at akka.actor.Cell.sendMessage(ActorCell.scala:350)
at akka.actor.Cell.sendMessage$(ActorCell.scala:349)
at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
at akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:173)
at akka.actor.Scheduler$$anon$3.run(Scheduler.scala:171)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

thanks all / by nicygan