dear all:
      
我有一个消费kafka数据写到pg库的任务,任务发生过重启,yarn日志显示jobmanager发生oom,但找不到具体原因,因为数据量非常小,按道理不该发生oom。
      详细如下:


1、部署方式:
flink on yarn ,pre-job,每个container 1024 M
jobmanager的jvmoption(默认的)  -Xms424m    -Xmx424m


2、数据情况:
kafka数据,约1分钟1条,文本数据,每条数据都非常小。


3、任务情况:
很简单,消费kafka然后直接写到pg库,中间没有任何处理,没有自定义的状态。
消费采用 FlinkKafkaConsumer
写库采用 JDBCAppendTableSink
并行度 1
checkpoint 2分钟一次,每次checkpoint约100ms
statebackend rocksdb


4、报错情况:
2020-07-10 11:51:54,237 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 555 @ 1594353114226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:51:54,421 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 555 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 77 ms).
2020-07-10 11:53:54,253 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 556 @ 1594353234226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:53:54,457 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 556 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 124 ms).
2020-07-10 11:55:54,246 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 557 @ 1594353354226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:55:54,402 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 557 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 115 ms).
2020-07-10 11:56:34,155 ERROR 
org.apache.flink.runtime.util.FatalExitExceptionHandler       - FATAL: Thread 
'flink-akka.actor.default-dispatcher-4673' produced an uncaught exception. 
Stopping the process...
java.lang.OutOfMemoryError: unable to create new native thread
        at java.lang.Thread.start0(Native Method)
        at java.lang.Thread.start(Thread.java:717)
        at 
akka.dispatch.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
        at 
akka.dispatch.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966)
        at 
akka.dispatch.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1905)
        at 
akka.dispatch.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834)
        at akka.dispatch.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool.execute(ForkJoinExecutorConfigurator.scala:30)
        at 
akka.dispatch.ExecutorServiceDelegate.execute(ThreadPoolBuilder.scala:211)
        at 
akka.dispatch.ExecutorServiceDelegate.execute$(ThreadPoolBuilder.scala:211)
        at 
akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:39)
        at akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:115)
        at akka.dispatch.Dispatcher.dispatch(Dispatcher.scala:55)
        at akka.actor.dungeon.Dispatch.sendMessage(Dispatch.scala:142)
        at akka.actor.dungeon.Dispatch.sendMessage$(Dispatch.scala:136)
        at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
        at akka.actor.Cell.sendMessage(ActorCell.scala:350)
        at akka.actor.Cell.sendMessage$(ActorCell.scala:349)
        at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
        at akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:173)
        at akka.actor.Scheduler$$anon$3.run(Scheduler.scala:171)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

thanks all / by nicygan

回复