好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?

streamTableEnv.getConfig().getConfiguration().setString(key, value);

________________________________
发件人: Xintong Song <tonysong...@gmail.com>
发送时间: 2020年11月16日 10:59
收件人: user-zh <user-zh@flink.apache.org>
主题: Re: flink-1.11.2 的 内存溢出问题

那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <shizhengc...@outlook.com> wrote:

> flink-on-yarn . per-job模式,重启是kafka的group.id
> 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> ________________________________
> 发件人: Xintong Song <tonysong...@gmail.com>
> 发送时间: 2020年11月16日 10:11
> 收件人: user-zh <user-zh@flink.apache.org>
> 主题: Re: flink-1.11.2 的 内存溢出问题
>
> 是什么部署模式呢?standalone?
> 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <shizhengc...@outlook.com> wrote:
>
> > 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> > 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> >
> > 2020-11-16 17:44:52
> > java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory
> > error has occurred. This can mean two things: either job(s) require(s) a
> > larger size of JVM direct memory or there is a direct memory leak. The
> > direct memory can be allocated by user code or some of its dependencies.
> In
> > this case 'taskmanager.memory.task.off-heap.size' configuration option
> > should be increased. Flink framework and its dependencies also consume
> the
> > direct memory, mostly for network communication. The most of network
> memory
> > is managed by Flink and should not result in out-of-memory error. In
> > certain special cases, in particular for jobs with high parallelism, the
> > framework may require more direct memory which is not managed by Flink.
> In
> > this case 'taskmanager.memory.framework.off-heap.size' configuration
> option
> > should be increased. If the error persists then there is probably a
> direct
> > memory leak in user code or some of its dependencies which has to be
> > investigated and fixed. The task executor has to be shutdown...
> >     at java.nio.Bits.reserveMemory(Bits.java:658)
> >     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> >     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> >     at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> >     at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> >     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> >     at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> >     at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> >
> >
> >
>

回复