dear all: 我有一个消费kafka数据写到pg库的任务,任务发生过重启,yarn日志显示jobmanager发生oom,但找不到具体原因,因为数据量非常小,按道理不该发生oom。 详细如下:
1、部署方式: flink on yarn ,pre-job,每个container 1024 M jobmanager的jvmoption(默认的) -Xms424m -Xmx424m 2、数据情况: kafka数据,约1分钟1条,文本数据,每条数据都非常小。 3、任务情况: 很简单,消费kafka然后直接写到pg库,中间没有任何处理,没有自定义的状态。 消费采用 FlinkKafkaConsumer 写库采用 JDBCAppendTableSink 并行度 1 checkpoint 2分钟一次,每次checkpoint约100ms statebackend rocksdb 4、报错情况: 2020-07-10 11:51:54,237 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 555 @ 1594353114226 for job cd5ceeedeb35e8e094991edf09233483. 2020-07-10 11:51:54,421 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 555 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 77 ms). 2020-07-10 11:53:54,253 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 556 @ 1594353234226 for job cd5ceeedeb35e8e094991edf09233483. 2020-07-10 11:53:54,457 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 556 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 124 ms). 2020-07-10 11:55:54,246 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 557 @ 1594353354226 for job cd5ceeedeb35e8e094991edf09233483. 2020-07-10 11:55:54,402 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 557 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 115 ms). 2020-07-10 11:56:34,155 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread 'flink-akka.actor.default-dispatcher-4673' produced an uncaught exception. Stopping the process... java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:717) at akka.dispatch.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672) at akka.dispatch.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966) at akka.dispatch.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1905) at akka.dispatch.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834) at akka.dispatch.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool.execute(ForkJoinExecutorConfigurator.scala:30) at akka.dispatch.ExecutorServiceDelegate.execute(ThreadPoolBuilder.scala:211) at akka.dispatch.ExecutorServiceDelegate.execute$(ThreadPoolBuilder.scala:211) at akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:39) at akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:115) at akka.dispatch.Dispatcher.dispatch(Dispatcher.scala:55) at akka.actor.dungeon.Dispatch.sendMessage(Dispatch.scala:142) at akka.actor.dungeon.Dispatch.sendMessage$(Dispatch.scala:136) at akka.actor.ActorCell.sendMessage(ActorCell.scala:429) at akka.actor.Cell.sendMessage(ActorCell.scala:350) at akka.actor.Cell.sendMessage$(ActorCell.scala:349) at akka.actor.ActorCell.sendMessage(ActorCell.scala:429) at akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:173) at akka.actor.Scheduler$$anon$3.run(Scheduler.scala:171) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) thanks all / by nicygan