Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-23 Thread Zhanghao Chen
Hi John,

Based on the Memory config screenshot provided before, each of your TM should 
have MaxDirectMemory=1GB (network mem) + 128 MB (framework off-heap) = 1152 MB. 
Nor will taskmanager.memory.flink.size and the total including MaxDirectMemory 
exceed pod physical mem, you may check the detailed TM memory model [1] and 
double check for yourself.

Maybe you can further analyze the direct memory usage using tools like JVM 
Native Memory Tracking (NMT).

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_setup_tm/#detailed-memory-model

Best,
Zhanghao Chen

From: John Smith 
Sent: Thursday, May 23, 2024 22:40
To: Zhanghao Chen 
Cc: Biao Geng ; user 
Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

Based on these two settings...
taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

Reading the docs here I'm not sure how to calculate the formula. My suspicion 
is that I may have allocated too much of taskmanager.memory.flink.size and the 
total including MaxDirectMemory is more than what the physical OS has, is that 
possible?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size

Are you able to tell me what these numbers come out for this formula 
MaxDirectMemory of TM = Network Memory + Task Off-Heap + Framework Off-heap?

On Thu, May 23, 2024 at 9:01 AM John Smith 
mailto:java.dev@gmail.com>> wrote:
Ok, but I still don't get why it's doing it... It's the same version of 
flink... Only difference is java 11 and also I allocated more JVM heap and the 
actual physical is has more ram. Maybe I should reduce the JVM heap by a a 
gigabyte or two?

On Wed, May 22, 2024, 12:37 PM Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi John,

A side note here: Flink will set the MaxDirectMemory of TM = Network Memory + 
Task Off-Heap + Framework Off-heap, and overwrites JVM's default setting, 
regardless of the version of JVM.

Best,
Zhanghao Chen

From: John Smith mailto:java.dev@gmail.com>>
Sent: Wednesday, May 22, 2024 22:56
To: Biao Geng mailto:biaoge...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

Hi, apologies I hit reply instead of reply all. So not sure who saw this or 
didn't. We have not switched to SSL and also our assumption here would be that 
if we did switch to SSL the jobs would not work or produce invalid results. The 
jobs work absolutely fine for a week or so and then they fail.

Here is the consumer config from the task logs, which says PLAINTEXT and port 
9092 is used. Also I attached a screen of the task manager memory usage. As 
well I read up on MaxDirectMemory setting of Java 8 vs Java 11. Java 8 by 
default calculates the direct memory size to 87% of the max heap size. While 
Java 11 set it to 100% of the max heap size.

[Screen Shot 2024-05-22 at 9.50.38 AM.png]

 allow.auto.create.topics = true
auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
auto.offset.reset = latest
bootstrap.servers = [xx-kafka-0001:9092, xx-0002:9092, 
xx-kafka-0003:9092]
check.crcs = true
client.dns.lookup = default
client.id<http://client.id> = xx
client.rack =
connections.max.idle.ms<http://connections.max.idle.ms> = 54
default.api.timeout.ms<http://default.api.timeout.ms> = 6
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
fetch.min.bytes = 1
group.id<http://group.id> = xx
group.instance.id<http://group.instance.id> = null
heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms<http://max.poll.interval.ms> = 30
max.poll.records = 500
metadata.max.age.ms<http://metadata.max.age.ms> = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms<http://metrics.sample.window.ms> = 3
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000
reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
request.timeout.ms<http://request.timeout.ms> = 6
retry.backoff.ms<http://retry.backoff.ms> = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
sa

Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-23 Thread John Smith
Based on these two settings...
taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

Reading the docs here I'm not sure how to calculate the formula. My
suspicion is that I may have allocated too much
of taskmanager.memory.flink.size and the total including MaxDirectMemory is
more than what the physical OS has, is that possible?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size

Are you able to tell me what these numbers come out for this formula
MaxDirectMemory
of TM = Network Memory + Task Off-Heap + Framework Off-heap?

On Thu, May 23, 2024 at 9:01 AM John Smith  wrote:

> Ok, but I still don't get why it's doing it... It's the same version of
> flink... Only difference is java 11 and also I allocated more JVM heap and
> the actual physical is has more ram. Maybe I should reduce the JVM heap by
> a a gigabyte or two?
>
> On Wed, May 22, 2024, 12:37 PM Zhanghao Chen 
> wrote:
>
>> Hi John,
>>
>> A side note here: Flink will set the MaxDirectMemory of TM = Network
>> Memory + Task Off-Heap + Framework Off-heap, and overwrites JVM's default
>> setting, regardless of the version of JVM.
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* John Smith 
>> *Sent:* Wednesday, May 22, 2024 22:56
>> *To:* Biao Geng 
>> *Cc:* user 
>> *Subject:* Re: Would Java 11 cause Getting OutOfMemoryError: Direct
>> buffer memory?
>>
>> Hi, apologies I hit reply instead of reply all. So not sure who saw this
>> or didn't. We have not switched to SSL and also our assumption here
>> would be that if we did switch to SSL the jobs would not work or produce
>> invalid results. The jobs work absolutely fine for a week or so and then
>> they fail.
>>
>> Here is the consumer config from the task logs, which says PLAINTEXT and
>> port 9092 is used. Also I attached a screen of the task manager memory
>> usage. As well I read up on MaxDirectMemory setting of Java 8 vs Java 11.
>> Java 8 by default calculates the direct memory size to 87% of the max heap
>> size. While Java 11 set it to 100% of the max heap size.
>>
>> [image: Screen Shot 2024-05-22 at 9.50.38 AM.png]
>>
>>  allow.auto.create.topics = true
>> auto.commit.interval.ms = 5000
>> auto.offset.reset = latest
>> bootstrap.servers = [xx-kafka-0001:9092, xx-0002:9092,
>> xx-kafka-0003:9092]
>> check.crcs = true
>> client.dns.lookup = default
>> client.id = xx
>> client.rack =
>> connections.max.idle.ms = 54
>> default.api.timeout.ms = 6
>> enable.auto.commit = false
>> exclude.internal.topics = true
>> fetch.max.bytes = 52428800
>> fetch.max.wait.ms = 500
>> fetch.min.bytes = 1
>> group.id = xx
>> group.instance.id = null
>> heartbeat.interval.ms = 3000
>> interceptor.classes = []
>> internal.leave.group.on.close = true
>> isolation.level = read_uncommitted
>> key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> max.partition.fetch.bytes = 1048576
>> max.poll.interval.ms = 30
>> max.poll.records = 500
>> metadata.max.age.ms = 30
>> metric.reporters = []
>> metrics.num.samples = 2
>> metrics.recording.level = INFO
>> metrics.sample.window.ms = 3
>> partition.assignment.strategy = [class
>> org.apache.kafka.clients.consumer.RangeAssignor]
>> receive.buffer.bytes = 65536
>> reconnect.backoff.max.ms = 1000
>> reconnect.backoff.ms = 50
>> request.timeout.ms = 6
>> retry.backoff.ms = 100
>> sasl.client.callback.handler.class = null
>> sasl.jaas.config = null
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> sasl.kerberos.min.time.before.relogin = 6
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> sasl.login.callback.handler.class = null
>> sasl.login.class = null
>> sasl.login.refresh.buffer.seconds = 300
>> sasl.login.refresh.min.period.seconds = 60
>> sasl.login.refresh.window.factor = 0.8
>> sasl.login.refresh.window.jitter = 0.05
>> sasl.mechanism = GSSAPI
>> security.protocol = PLAINTEXT
>> security.providers = null
>> send.buffer.bytes = 131072
>> session.timeout.ms = 1
>> ssl.cipher.suites = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> ssl.endpoint.identification.algorithm = https
>> ssl.key.password = null
>> ssl.keymanager.algorithm = SunX509
>> ssl.keystore.location = null
>> ssl.keystore.password 

Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-16 Thread John Smith
Hi. No I have not changed the protocol.

On Thu, May 16, 2024, 3:20 AM Biao Geng  wrote:

> Hi John,
>
> Just want to check, have you ever changed the kafka protocol in your job
> after using the new cluster? The error message shows that it is caused by
> the kafka client and there is a similar error in this issue
> 
> .
>
> Best,
> Biao Geng
>
>
> John Smith  于2024年5月16日周四 09:01写道:
>
>> I deployed a new cluster, same version as my old cluster(1.14.4 ), only
>> difference using Java 11 and it seems after a week of usage the below
>> exception happens.
>>
>> The task manager is...
>>
>> 32GB total
>>
>> And i have the ONLY following memory settings
>>
>> taskmanager.memory.flink.size: 16384m
>> taskmanager.memory.jvm-metaspace.size: 3072m
>>
>>
>>
>>
>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
>> out-of-memory error has occurred. This can mean two things: either job(s)
>> require(s) a larger size of JVM direct memory or there is a direct memory
>> leak. The direct memory can be allocated by user code or some of its
>> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>> configuration option should be increased. Flink framework and its
>> dependencies also consume the direct memory, mostly for network
>> communication. The most of network memory is managed by Flink and should
>> not result in out-of-memory error. In certain special cases, in particular
>> for jobs with high parallelism, the framework may require more direct
>> memory which is not managed by Flink. In this case
>> 'taskmanager.memory.framework.off-heap.size' configuration option should be
>> increased. If the error persists then there is probably a direct memory
>> leak in user code or some of its dependencies which has to be investigated
>> and fixed. The task executor has to be shutdown...
>> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
>> at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
>> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
>> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
>> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
>> at
>> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
>> at
>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
>> at
>> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>> at
>> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>> at
>> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> ... 1 more
>>
>


Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-16 Thread Biao Geng
Hi John,

Just want to check, have you ever changed the kafka protocol in your job
after using the new cluster? The error message shows that it is caused by
the kafka client and there is a similar error in this issue

.

Best,
Biao Geng


John Smith  于2024年5月16日周四 09:01写道:

> I deployed a new cluster, same version as my old cluster(1.14.4 ), only
> difference using Java 11 and it seems after a week of usage the below
> exception happens.
>
> The task manager is...
>
> 32GB total
>
> And i have the ONLY following memory settings
>
> taskmanager.memory.flink.size: 16384m
> taskmanager.memory.jvm-metaspace.size: 3072m
>
>
>
>
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things: either job(s)
> require(s) a larger size of JVM direct memory or there is a direct memory
> leak. The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased. Flink framework and its
> dependencies also consume the direct memory, mostly for network
> communication. The most of network memory is managed by Flink and should
> not result in out-of-memory error. In certain special cases, in particular
> for jobs with high parallelism, the framework may require more direct
> memory which is not managed by Flink. In this case
> 'taskmanager.memory.framework.off-heap.size' configuration option should be
> increased. If the error persists then there is probably a direct memory
> leak in user code or some of its dependencies which has to be investigated
> and fixed. The task executor has to be shutdown...
> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
> at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
> at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 more
>


Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-15 Thread John Smith
I deployed a new cluster, same version as my old cluster(1.14.4 ), only
difference using Java 11 and it seems after a week of usage the below
exception happens.

The task manager is...

32GB total

And i have the ONLY following memory settings

taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m




Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
out-of-memory error has occurred. This can mean two things: either job(s)
require(s) a larger size of JVM direct memory or there is a direct memory
leak. The direct memory can be allocated by user code or some of its
dependencies. In this case 'taskmanager.memory.task.off-heap.size'
configuration option should be increased. Flink framework and its
dependencies also consume the direct memory, mostly for network
communication. The most of network memory is managed by Flink and should
not result in out-of-memory error. In certain special cases, in particular
for jobs with high parallelism, the framework may require more direct
memory which is not managed by Flink. In this case
'taskmanager.memory.framework.off-heap.size' configuration option should be
increased. If the error persists then there is probably a direct memory
leak in user code or some of its dependencies which has to be investigated
and fixed. The task executor has to be shutdown...
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
at
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more