Hi,
调大 taskmanager.memory.task.off-heap.size 应该能解决部分问题,
我这里还有些疑问,部署的session集群,每次集群只跑这一个任务,执行结束才开始下一个任务,如果是前面的任务read/write申请的堆外内存,执行结束的时候,会立即释放吗?
执行几次任务之后,才出现这种异常,前面任务都是成功的,后面任务就异常了,感觉有内存泄漏的现象。Flink 
taskmanager的off-heap内存管理有更多的介绍吗?(官网的看过了)
Thanks

在 2022-10-10 12:34:55,"yanfei lei" <fredia...@gmail.com> 写道:
>从报错看是Direct memory不够导致的,可以将taskmanager.memory.task.off-heap.size调大试试看。
>
>Best,
>Yanfei
>
>allanqinjy <allanqi...@163.com> 于2022年10月8日周六 21:19写道:
>
>>
>> 看堆栈信息是内存不够,调大一些看看。我之前在读取hdfs上的一个获取地理位置的离线库,也是内存溢出,通过调整内存大小解决的。用的streamingapi开发的作业,1.12.5版本。
>>
>>
>> | |
>> allanqinjy
>> |
>> |
>> allanqi...@163.com
>> |
>> 签名由网易邮箱大师定制
>>
>>
>> On 10/8/2022 21:00,RS<tinyshr...@163.com> wrote:
>> Hi,
>>
>>
>> 版本:Flink-1.15.1
>>
>>
>> 有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink
>> SQL定义执行,source是connector=filesystem,format=raw,path=<HDFS文件路径>
>>
>>
>> 执行任务的时候,有时候能成功,有时候失败了然后就一直失败,重启集群好像可以解决问题,这种情况如何是什么原因导致的?
>>
>>
>> 集群的off-heap都是默认配置,
>> taskmanager.memory.task.off-heap.size=0
>> taskmanager.memory.framework.off-heap.size=128MB
>>
>>
>> 报错堆栈:
>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
>> out-of-memory error has occurred. This can mean two things: either job(s)
>> require(s) a larger size of JVM direct memory or there is a direct memory
>> leak. The direct memory can be allocated by user code or some of its
>> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>> configuration option should be increased. Flink framework and its
>> dependencies also consume the direct memory, mostly for network
>> communication. The most of network memory is managed by Flink and should
>> not result in out-of-memory error. In certain special cases, in particular
>> for jobs with high parallelism, the framework may require more direct
>> memory which is not managed by Flink. In this case
>> 'taskmanager.memory.framework.off-heap.size' configuration option should be
>> increased. If the error persists then there is probably a direct memory
>> leak in user code or some of its dependencies which has to be investigated
>> and fixed. The task executor has to be shutdown...
>> at java.nio.Bits.reserveMemory(Bits.java:695)
>> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>> at
>> org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool.java:72)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.reallocPacketBuf(PacketReceiver.java:270)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:163)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
>> at
>> org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.readNextPacket(BlockReaderRemote.java:183)
>> at
>> org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.read(BlockReaderRemote.java:142)
>> at
>> org.apache.hadoop.hdfs.ByteArrayStrategy.readFromBlock(ReaderStrategy.java:118)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:704)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:765)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:825)
>> at java.io.DataInputStream.read(DataInputStream.java:149)
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:96)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:742)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:586)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:505)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:50)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.loadSplit(ContinuousFileReaderOperator.java:415)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.access$300(ContinuousFileReaderOperator.java:98)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ReaderState$2.prepareToProcessRecord(ContinuousFileReaderOperator.java:122)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:348)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> at java.lang.Thread.run(Thread.java:750)
>>
>>
>> Thanks

回复