[Help needed] Flink Statefun Runtime on Kubernetes - OutOfMemoryError Issue

2024-09-01 Thread Oliver Schmied
Hello everyone,

I have set up the Flink statefun runtime on Kubernetes according to this tutorial https://github.com/apache/flink-statefun-playground/tree/main/deployments/k8s. I developed 12 custom statefun-Functions in Java and deployed them in the same way as shown in the tutorial. There are a lot of functions instances running at the same time. After receiving about 1 million messages the statefun master restarts and I see this in the log of the statefun worker.

 

```

2024-08-31 18:52:14,327 WARN org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception caught while trying to deliver a message: (attempt #0)ToFunctionRequestSummary(address=Address(func, parse, 0), batchSize=22680, totalSizeInBytes=5947037, numberOfStates=0) java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?] at java.nio.DirectByteBuffer.(Unknown Source) ~[?:?] at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:632) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:607) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:202) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:186) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:136) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:126) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:395) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyProtobuf.serializeProtobuf(NettyProtobuf.java:39) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyHandler.write(NettyRequestReplyHandler.java:81) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:306) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyClient.writeAndFlush(NettyClient.java:188) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyRequest.onChannelAcquisitionComplete(NettyRequest.java:146) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyClient.lambda$acquireChannel$0(NettyClient.java:129) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.

Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-23 Thread Zhanghao Chen
Hi John,

Based on the Memory config screenshot provided before, each of your TM should 
have MaxDirectMemory=1GB (network mem) + 128 MB (framework off-heap) = 1152 MB. 
Nor will taskmanager.memory.flink.size and the total including MaxDirectMemory 
exceed pod physical mem, you may check the detailed TM memory model [1] and 
double check for yourself.

Maybe you can further analyze the direct memory usage using tools like JVM 
Native Memory Tracking (NMT).

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_setup_tm/#detailed-memory-model

Best,
Zhanghao Chen

From: John Smith 
Sent: Thursday, May 23, 2024 22:40
To: Zhanghao Chen 
Cc: Biao Geng ; user 
Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

Based on these two settings...
taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

Reading the docs here I'm not sure how to calculate the formula. My suspicion 
is that I may have allocated too much of taskmanager.memory.flink.size and the 
total including MaxDirectMemory is more than what the physical OS has, is that 
possible?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size

Are you able to tell me what these numbers come out for this formula 
MaxDirectMemory of TM = Network Memory + Task Off-Heap + Framework Off-heap?

On Thu, May 23, 2024 at 9:01 AM John Smith 
mailto:java.dev@gmail.com>> wrote:
Ok, but I still don't get why it's doing it... It's the same version of 
flink... Only difference is java 11 and also I allocated more JVM heap and the 
actual physical is has more ram. Maybe I should reduce the JVM heap by a a 
gigabyte or two?

On Wed, May 22, 2024, 12:37 PM Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi John,

A side note here: Flink will set the MaxDirectMemory of TM = Network Memory + 
Task Off-Heap + Framework Off-heap, and overwrites JVM's default setting, 
regardless of the version of JVM.

Best,
Zhanghao Chen

From: John Smith mailto:java.dev@gmail.com>>
Sent: Wednesday, May 22, 2024 22:56
To: Biao Geng mailto:biaoge...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

Hi, apologies I hit reply instead of reply all. So not sure who saw this or 
didn't. We have not switched to SSL and also our assumption here would be that 
if we did switch to SSL the jobs would not work or produce invalid results. The 
jobs work absolutely fine for a week or so and then they fail.

Here is the consumer config from the task logs, which says PLAINTEXT and port 
9092 is used. Also I attached a screen of the task manager memory usage. As 
well I read up on MaxDirectMemory setting of Java 8 vs Java 11. Java 8 by 
default calculates the direct memory size to 87% of the max heap size. While 
Java 11 set it to 100% of the max heap size.

[Screen Shot 2024-05-22 at 9.50.38 AM.png]

 allow.auto.create.topics = true
auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
auto.offset.reset = latest
bootstrap.servers = [xx-kafka-0001:9092, xx-0002:9092, 
xx-kafka-0003:9092]
check.crcs = true
client.dns.lookup = default
client.id<http://client.id> = xx
client.rack =
connections.max.idle.ms<http://connections.max.idle.ms> = 54
default.api.timeout.ms<http://default.api.timeout.ms> = 6
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
fetch.min.bytes = 1
group.id<http://group.id> = xx
group.instance.id<http://group.instance.id> = null
heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms<http://max.poll.interval.ms> = 30
max.poll.records = 500
metadata.max.age.ms<http://metadata.max.age.ms> = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms<http://metrics.sample.window.ms> = 3
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000
reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
request.timeout.ms<http://request.timeout.ms> = 6
retry.backoff.ms<http://retry.backoff.ms> = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name<http://sasl.kerbe

Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-23 Thread John Smith
Based on these two settings...
taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

Reading the docs here I'm not sure how to calculate the formula. My
suspicion is that I may have allocated too much
of taskmanager.memory.flink.size and the total including MaxDirectMemory is
more than what the physical OS has, is that possible?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size

Are you able to tell me what these numbers come out for this formula
MaxDirectMemory
of TM = Network Memory + Task Off-Heap + Framework Off-heap?

On Thu, May 23, 2024 at 9:01 AM John Smith  wrote:

> Ok, but I still don't get why it's doing it... It's the same version of
> flink... Only difference is java 11 and also I allocated more JVM heap and
> the actual physical is has more ram. Maybe I should reduce the JVM heap by
> a a gigabyte or two?
>
> On Wed, May 22, 2024, 12:37 PM Zhanghao Chen 
> wrote:
>
>> Hi John,
>>
>> A side note here: Flink will set the MaxDirectMemory of TM = Network
>> Memory + Task Off-Heap + Framework Off-heap, and overwrites JVM's default
>> setting, regardless of the version of JVM.
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* John Smith 
>> *Sent:* Wednesday, May 22, 2024 22:56
>> *To:* Biao Geng 
>> *Cc:* user 
>> *Subject:* Re: Would Java 11 cause Getting OutOfMemoryError: Direct
>> buffer memory?
>>
>> Hi, apologies I hit reply instead of reply all. So not sure who saw this
>> or didn't. We have not switched to SSL and also our assumption here
>> would be that if we did switch to SSL the jobs would not work or produce
>> invalid results. The jobs work absolutely fine for a week or so and then
>> they fail.
>>
>> Here is the consumer config from the task logs, which says PLAINTEXT and
>> port 9092 is used. Also I attached a screen of the task manager memory
>> usage. As well I read up on MaxDirectMemory setting of Java 8 vs Java 11.
>> Java 8 by default calculates the direct memory size to 87% of the max heap
>> size. While Java 11 set it to 100% of the max heap size.
>>
>> [image: Screen Shot 2024-05-22 at 9.50.38 AM.png]
>>
>>  allow.auto.create.topics = true
>> auto.commit.interval.ms = 5000
>> auto.offset.reset = latest
>> bootstrap.servers = [xx-kafka-0001:9092, xx-0002:9092,
>> xx-kafka-0003:9092]
>> check.crcs = true
>> client.dns.lookup = default
>> client.id = xx
>> client.rack =
>> connections.max.idle.ms = 54
>> default.api.timeout.ms = 6
>> enable.auto.commit = false
>> exclude.internal.topics = true
>> fetch.max.bytes = 52428800
>> fetch.max.wait.ms = 500
>> fetch.min.bytes = 1
>> group.id = xx
>> group.instance.id = null
>> heartbeat.interval.ms = 3000
>> interceptor.classes = []
>> internal.leave.group.on.close = true
>> isolation.level = read_uncommitted
>> key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> max.partition.fetch.bytes = 1048576
>> max.poll.interval.ms = 30
>> max.poll.records = 500
>> metadata.max.age.ms = 30
>> metric.reporters = []
>> metrics.num.samples = 2
>> metrics.recording.level = INFO
>> metrics.sample.window.ms = 3
>> partition.assignment.strategy = [class
>> org.apache.kafka.clients.consumer.RangeAssignor]
>> receive.buffer.bytes = 65536
>> reconnect.backoff.max.ms = 1000
>> reconnect.backoff.ms = 50
>> request.timeout.ms = 6
>> retry.backoff.ms = 100
>> sasl.client.callback.handler.class = null
>> sasl.jaas.config = null
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> sasl.kerberos.min.time.before.relogin = 6
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> sasl.login.callback.handler.class = null
>> sasl.login.class = null
>> sasl.login.refresh.buffer.seconds = 300
>> sasl.login.refresh.min.period.seconds = 60
>> sasl.login.refresh.window.factor = 0.8
>> sasl.login.refresh.window.jitter = 0.05
>> sasl.mechanism = GSSAPI
>> security.protocol = PLAINTEXT
>> security.providers = null
>> send.buffer.bytes = 131072
>> session.timeout.ms = 1
>> ssl.cipher.suites = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> ssl.endpoint.identification.algorithm = https
>> ssl.key.password = null
>> ssl.keymanager.algorithm = SunX509
>> ssl.keystore.location = null
&

Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-16 Thread John Smith
Hi. No I have not changed the protocol.

On Thu, May 16, 2024, 3:20 AM Biao Geng  wrote:

> Hi John,
>
> Just want to check, have you ever changed the kafka protocol in your job
> after using the new cluster? The error message shows that it is caused by
> the kafka client and there is a similar error in this issue
> <https://ververica.zendesk.com/hc/en-us/articles/4413642980498-Direct-buffer-OutOfMemoryError-when-using-Kafka-Connector-in-Flink>
> .
>
> Best,
> Biao Geng
>
>
> John Smith  于2024年5月16日周四 09:01写道:
>
>> I deployed a new cluster, same version as my old cluster(1.14.4 ), only
>> difference using Java 11 and it seems after a week of usage the below
>> exception happens.
>>
>> The task manager is...
>>
>> 32GB total
>>
>> And i have the ONLY following memory settings
>>
>> taskmanager.memory.flink.size: 16384m
>> taskmanager.memory.jvm-metaspace.size: 3072m
>>
>>
>>
>>
>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
>> out-of-memory error has occurred. This can mean two things: either job(s)
>> require(s) a larger size of JVM direct memory or there is a direct memory
>> leak. The direct memory can be allocated by user code or some of its
>> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>> configuration option should be increased. Flink framework and its
>> dependencies also consume the direct memory, mostly for network
>> communication. The most of network memory is managed by Flink and should
>> not result in out-of-memory error. In certain special cases, in particular
>> for jobs with high parallelism, the framework may require more direct
>> memory which is not managed by Flink. In this case
>> 'taskmanager.memory.framework.off-heap.size' configuration option should be
>> increased. If the error persists then there is probably a direct memory
>> leak in user code or some of its dependencies which has to be investigated
>> and fixed. The task executor has to be shutdown...
>> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
>> at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
>> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
>> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
>> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
>> at
>> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
>> at
>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
>> at
>> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>> at
>> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>> at
>> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> ... 1 more
>>
>


Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-16 Thread Biao Geng
Hi John,

Just want to check, have you ever changed the kafka protocol in your job
after using the new cluster? The error message shows that it is caused by
the kafka client and there is a similar error in this issue
<https://ververica.zendesk.com/hc/en-us/articles/4413642980498-Direct-buffer-OutOfMemoryError-when-using-Kafka-Connector-in-Flink>
.

Best,
Biao Geng


John Smith  于2024年5月16日周四 09:01写道:

> I deployed a new cluster, same version as my old cluster(1.14.4 ), only
> difference using Java 11 and it seems after a week of usage the below
> exception happens.
>
> The task manager is...
>
> 32GB total
>
> And i have the ONLY following memory settings
>
> taskmanager.memory.flink.size: 16384m
> taskmanager.memory.jvm-metaspace.size: 3072m
>
>
>
>
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things: either job(s)
> require(s) a larger size of JVM direct memory or there is a direct memory
> leak. The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased. Flink framework and its
> dependencies also consume the direct memory, mostly for network
> communication. The most of network memory is managed by Flink and should
> not result in out-of-memory error. In certain special cases, in particular
> for jobs with high parallelism, the framework may require more direct
> memory which is not managed by Flink. In this case
> 'taskmanager.memory.framework.off-heap.size' configuration option should be
> increased. If the error persists then there is probably a direct memory
> leak in user code or some of its dependencies which has to be investigated
> and fixed. The task executor has to be shutdown...
> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
> at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
> at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 more
>


Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-15 Thread John Smith
I deployed a new cluster, same version as my old cluster(1.14.4 ), only
difference using Java 11 and it seems after a week of usage the below
exception happens.

The task manager is...

32GB total

And i have the ONLY following memory settings

taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m




Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
out-of-memory error has occurred. This can mean two things: either job(s)
require(s) a larger size of JVM direct memory or there is a direct memory
leak. The direct memory can be allocated by user code or some of its
dependencies. In this case 'taskmanager.memory.task.off-heap.size'
configuration option should be increased. Flink framework and its
dependencies also consume the direct memory, mostly for network
communication. The most of network memory is managed by Flink and should
not result in out-of-memory error. In certain special cases, in particular
for jobs with high parallelism, the framework may require more direct
memory which is not managed by Flink. In this case
'taskmanager.memory.framework.off-heap.size' configuration option should be
increased. If the error persists then there is probably a direct memory
leak in user code or some of its dependencies which has to be investigated
and fixed. The task executor has to be shutdown...
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
at
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more


Re: OutOfMemoryError (java heap space) on small, local test

2022-10-31 Thread Matt Fysh
Thanks Leonard for taking a look. It seems odd that returning a list of
objects can cause a fatal error such as this, and since I am new to Flink
and also relatively new to Python, I assume that I am doing
something wrong as returning a list of objects is a fairly common data
modelling scenario.

Please let me know which sections of the docs, or which areas of Python, I
should read to learn how to find a solution to this problem

Thanks

On Mon, 31 Oct 2022 at 18:49, Leonard Xu  wrote:

> Hi, Matt
>
> I’ve checked your job is pretty simple, I've CC Xingbo who is a PyFlink
> expert to help take a quick look.
>
>
> Best,
> Leonard
>
> 2022年10月31日 上午11:47,Matt Fysh  写道:
>
> Hi there,
>
> I am running a local test with:
> * source = env.from_collection
> * sink = datastream.execute_and_collect
> with a map function between, and two very small data points in the
> collection
>
> I'm able to generate an OutOfMemoryError, and due to the nature of this
> test using simple source and sink, plus not having large data size
> requirements, I suspect this is due to a bug.
>
> I'm running v1.13.2 and have created a docker-based reproduction
> repository here: https://github.com/mattfysh/pyflink-oom
>
> Please take a look and let me know what you think
>
> Thanks!
> Matt
>
>
>


Re: OutOfMemoryError (java heap space) on small, local test

2022-10-31 Thread Leonard Xu
Hi, Matt

I’ve checked your job is pretty simple, I've CC Xingbo who is a PyFlink expert 
to help take a quick look. 


Best,
Leonard

> 2022年10月31日 上午11:47,Matt Fysh  写道:
> 
> Hi there,
> 
> I am running a local test with:
> * source = env.from_collection
> * sink = datastream.execute_and_collect
> with a map function between, and two very small data points in the collection
> 
> I'm able to generate an OutOfMemoryError, and due to the nature of this test 
> using simple source and sink, plus not having large data size requirements, I 
> suspect this is due to a bug.
> 
> I'm running v1.13.2 and have created a docker-based reproduction repository 
> here: https://github.com/mattfysh/pyflink-oom 
> <https://github.com/mattfysh/pyflink-oom>
> 
> Please take a look and let me know what you think
> 
> Thanks!
> Matt



OutOfMemoryError (java heap space) on small, local test

2022-10-30 Thread Matt Fysh
Hi there,

I am running a local test with:
* source = env.from_collection
* sink = datastream.execute_and_collect
with a map function between, and two very small data points in the
collection

I'm able to generate an OutOfMemoryError, and due to the nature of this
test using simple source and sink, plus not having large data size
requirements, I suspect this is due to a bug.

I'm running v1.13.2 and have created a docker-based reproduction repository
here: https://github.com/mattfysh/pyflink-oom

Please take a look and let me know what you think

Thanks!
Matt


Re: Broadcast state and OutOfMemoryError: Direct buffer memory

2022-10-21 Thread yanfei lei
Hi Dan,
Usually broadcast state needs more network buffers, the network buffer used
to exchange data records between tasks would request a portion of direct
memory[1],  I think it is possible to get the “Direct buffer memory” OOM
errors in this scenarios. Maybe you can try to increase
taskmanager.memory.framework.off-heap.size

 and taskmanager.memory.task.off-heap.size

.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#detailed-memory-model

Best,
Yanfei

Dan Hill  于2022年10月21日周五 15:39写道:

> Hi.  My team recently added broadcast state to our Flink jobs.  We've
> started hitting this OOM "Direct buffer memory" error.  Is this a common
> problem with broadcast state?  Or is it likely a different problem?
> Thanks! - Dan
>


Broadcast state and OutOfMemoryError: Direct buffer memory

2022-10-21 Thread Dan Hill
Hi.  My team recently added broadcast state to our Flink jobs.  We've
started hitting this OOM "Direct buffer memory" error.  Is this a common
problem with broadcast state?  Or is it likely a different problem?
Thanks! - Dan


Re: OutOfMemoryError: Java heap space while implmentating flink sql api

2022-01-13 Thread Martijn Visser
Hi Ronak,

As mentioned in the Flink Community & Project information [1] the primary
place for support are the mailing lists and user support should go to the
User mailing list. Keep in mind that this is still done by the community
and set up for asynchronous handling. If you want to have quick
acknowledgment or SLAs, there are vendors that can offer commercial support
on Flink.

You can't compare the two statements, because in your first join you're
also applying a TUMBLE. That means that you're not only maintaining state
for your join, but also for your window. You're also using the old Group
Window Aggregation function and it's recommended to use Window TVFs due to
better performance optimizations [2]

Best regards,

Martijn

[1]
https://flink.apache.org/community.html#how-do-i-get-help-from-apache-flink
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#group-window-aggregation


On Thu, 13 Jan 2022 at 06:33, Ronak Beejawat (rbeejawa) 
wrote:

> HI Martijn,
>
>
>
> I posted the below query both the places(flink mailing list and stack
> overflow) to get a quick response on it.
>
> Please let me know the exact poc / mailing list to post my quries if it is
> causing trouble, so at least we can get quick acknowledgement on the issues
> reported.
>
>
>
> Ok let me ask the below question in a simpler way
>
>
>
> *Join 1 *
>
>
>
> select * from cdrTable left join  ccmversionsumapTable cvsm ON
> (cdrTable.version = ccmversionsumapTable.ccmversion) group by
> TUMBLE(PROCTIME(), INTERVAL '1' MINUTE), …
>
> (2.5 million left join with 23 records it is failing to compute and
> throwing heap error)
>
> Note: This is small join example as compared to Join2 condition as shown
> below. here we are using different connector for reading cdrTable -> kafka
> connector and ccmversionsumapTable -> jdbc connector
>
>
>
> *Join 2*
>
>
>
> select * from cdrTable left join  left join cmrTable cmr on (cdr.org_id =
> cmr.org_id AND cdr.cluster_id = cmr.cluster_id AND
> cdr.globalcallid_callmanagerid = cmr.globalcallid_callmanagerid AND
> cdr.globalcallid_callid = cmr.globalcallid_callid AND
> (cdr.origlegcallidentifier = cmr.callidentifier OR
> cdr.destlegcallidentifier = cmr.callidentifier)), … (2.5 million left join
> with 5 million it is computing properly without any heap error )
>
> Note: This is bigger join example as compared to Join1 condition as shown
> above. here we are using same connector for reading cdrTable , cmrTable ->
> kafka connector
>
>
>
> So the question is with small join condition it is throwing heap error and
> with bigger set of join it is working properly . Please help us on this
>
>
>
> Thanks
>
> Ronak Beejawat
>
>
>
> *From: *Martijn Visser 
> *Date: *Wednesday, 12 January 2022 at 7:43 PM
> *To: *dev 
> *Cc: *commun...@flink.apache.org ,
> user@flink.apache.org , Hang Ruan <
> ruanhang1...@gmail.com>, Shrinath Shenoy K (sshenoyk) ,
> Jayaprakash Kuravatti (jkuravat) , Krishna Singitam
> (ksingita) , Nabhonil Sinha (nasinha) <
> nasi...@cisco.com>, Vibhor Jain (vibhjain) ,
> Raghavendra Jsv (rjsv) , Arun Yadav (aruny) <
> ar...@cisco.com>, Avi Sanwal (asanwal) 
> *Subject: *Re: OutOfMemoryError: Java heap space while implmentating
> flink sql api
>
> Hi Ronak,
>
>
>
> I would like to ask you to stop cross-posting to all the Flink mailing
> lists and then also post the same question to Stackoverflow. Both the
> mailing lists and Stackoverflow are designed for asynchronous communication
> and you should allow the community some days to address your question.
>
>
>
> Joins are state heavy. As mentioned in the documentation [1] "Thus, the
> required state for computing the query result might grow infinitely
> depending on the number of distinct input rows of all input tables and
> intermediate join results."
>
>
>
> Best regards,
>
>
>
> Martijn
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/
>
>
>
>
>
> On Wed, 12 Jan 2022 at 11:06, Ronak Beejawat (rbeejawa) <
> rbeej...@cisco.com.invalid> wrote:
>
> Hi Team,
>
> I was trying to implement flink sql api join with 2 tables it is throwing
> error OutOfMemoryError: Java heap space . PFB screenshot for flink cluster
> memory details.
> [Flink Memory Model][1]
>
>
>   [1]: https://i.stack.imgur.com/AOnQI.png
>
> **PFB below code snippet which I was trying:**
> ```
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().bu

Re: OutOfMemoryError: Java heap space while implmentating flink sql api

2022-01-12 Thread Martijn Visser
Hi Ronak,

I would like to ask you to stop cross-posting to all the Flink mailing
lists and then also post the same question to Stackoverflow. Both the
mailing lists and Stackoverflow are designed for asynchronous communication
and you should allow the community some days to address your question.

Joins are state heavy. As mentioned in the documentation [1] "Thus, the
required state for computing the query result might grow infinitely
depending on the number of distinct input rows of all input tables and
intermediate join results."

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/


On Wed, 12 Jan 2022 at 11:06, Ronak Beejawat (rbeejawa)
 wrote:

> Hi Team,
>
> I was trying to implement flink sql api join with 2 tables it is throwing
> error OutOfMemoryError: Java heap space . PFB screenshot for flink cluster
> memory details.
> [Flink Memory Model][1]
>
>
>   [1]: https://i.stack.imgur.com/AOnQI.png
>
> **PFB below code snippet which I was trying:**
> ```
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
>
>
> tableEnv.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
> "TWO_PHASE");
> tableEnv.getConfig().getConfiguration().setString("table.optimizer.join-reorder-enabled",
> "true");
> tableEnv.getConfig().getConfiguration().setString("table.exec.resource.default-parallelism",
> "16");
>
> tableEnv.executeSql("CREATE TEMPORARY TABLE ccmversionsumapTable (\r\n"
>  + "  suname STRING\r\n"
>  + "  ,ccmversion STRING\r\n"
>  + "   )\r\n"
>  + "   WITH (\r\n"
>  + "   'connector' =
> 'jdbc'\r\n"
>  + "   ,'url' =
> 'jdbc:mysql://:3306/ccucdb'\r\n"
>  + "   ,'table-name' =
> 'ccmversionsumap'\r\n"
>  + "   ,'username' =
> '*'\r\n"
>  + "   ,'password' =
> ''\r\n"
>  + "   )");
>
> tableEnv.executeSql("CREATE TEMPORARY TABLE cdrTable (\r\n"
>+ "   org_id STRING\r\n"
>+ "   ,cluster_id STRING\r\n"
>+ "   ,cluster_name STRING\r\n"
>+ "   ,version STRING\r\n"
>+ "   ,ip_address STRING\r\n"
>+ "   ,pkid STRING\r\n"
>+ "   ,globalcallid_callid INT\r\n"
>   ... --- multiple columns can be added
>+ "   )\r\n"
>+ "   WITH (\r\n"
>+ "   'connector' = 'kafka'\r\n"
>+ "   ,'topic' = 'cdr'\r\n"
>+ "   ,'properties.bootstrap.servers' =
> ':9092'\r\n"
>+ "   ,'scan.startup.mode' =
> 'earliest-offset'\r\n"
>//+ ",'value.fields-include' =
> 'EXCEPT_KEY'\r\n"
>+ "   ,'format' = 'json'\r\n"
>+ "   )");
>
>
> String sql = "SELECT cdr.org_id orgid,\r\n"
>   + "
>  cdr.cluster_name clustername,\r\n"
>   + "
>  cdr.cluster_id clusterid,\r\n"
>   + "
>  cdr.ip_address clusteripaddr,\r\n"
>   + "
>  cdr.version clusterversion,\r\n"
>   + "
>  c

Re: OutOfMemoryError: Java heap space while implmentating flink sql api

2022-01-12 Thread Roman Khachatryan
Hi Ronak,

You shared a screenshot of JM. Do you mean that exception also happens
on JM? (I'd rather assume TM).

Could you explain the join clause: left join ccmversionsumapTable cvsm
ON (cdr.version = cvsm.ccmversion)
"version" doesn't sound very selective, so maybe you end up with
(almost) Cartesian product?

Regards,
Roman

On Wed, Jan 12, 2022 at 11:06 AM Ronak Beejawat (rbeejawa)
 wrote:
>
> Hi Team,
>
> I was trying to implement flink sql api join with 2 tables it is throwing 
> error OutOfMemoryError: Java heap space . PFB screenshot for flink cluster 
> memory details.
> [Flink Memory Model][1]
>
>
>   [1]: https://i.stack.imgur.com/AOnQI.png
>
> **PFB below code snippet which I was trying:**
> ```
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inStreamingMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
>
>
> tableEnv.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
>  "TWO_PHASE");
> tableEnv.getConfig().getConfiguration().setString("table.optimizer.join-reorder-enabled",
>  "true");
> tableEnv.getConfig().getConfiguration().setString("table.exec.resource.default-parallelism",
>  "16");
>
> tableEnv.executeSql("CREATE TEMPORARY TABLE ccmversionsumapTable (\r\n"
>  + "  suname STRING\r\n"
>  + "  ,ccmversion STRING\r\n"
>  + "   )\r\n"
>  + "   WITH (\r\n"
>  + "   'connector' = 
> 'jdbc'\r\n"
>  + "   ,'url' = 
> 'jdbc:mysql://:3306/ccucdb'\r\n"
>  + "   ,'table-name' = 
> 'ccmversionsumap'\r\n"
>  + "   ,'username' = 
> '*'\r\n"
>  + "   ,'password' = 
> ''\r\n"
>  + "   )");
>
> tableEnv.executeSql("CREATE TEMPORARY TABLE cdrTable (\r\n"
>+ "   org_id STRING\r\n"
>+ "   ,cluster_id STRING\r\n"
>+ "   ,cluster_name STRING\r\n"
>+ "   ,version STRING\r\n"
>+ "   ,ip_address STRING\r\n"
>+ "   ,pkid STRING\r\n"
>+ "   ,globalcallid_callid INT\r\n"
>   ... --- multiple columns can be added
>+ "   )\r\n"
>+ "   WITH (\r\n"
>+ "   'connector' = 'kafka'\r\n"
>+ "   ,'topic' = 'cdr'\r\n"
>+ "   ,'properties.bootstrap.servers' = 
> ':9092'\r\n"
>+ "   ,'scan.startup.mode' = 
> 'earliest-offset'\r\n"
>//+ ",'value.fields-include' = 
> 'EXCEPT_KEY'\r\n"
>+ "   ,'format' = 'json'\r\n"
>+ "   )");
>
>
> String sql = "SELECT cdr.org_id orgid,\r\n"
>   + " 
> cdr.cluster_name clustername,\r\n"
>   + " 
> cdr.cluster_id clusterid,\r\n"
>   + " 
> cdr.ip_address clusteripaddr,\r\n"
>   + " 
> cdr.version clusterversion,\r\n"
>   + " 
> cvsm.suname clustersuname,\r\n"
>   + " 
> cdr.pkid cdrpkid,\r\n"
>   ... --- 
> multiple columns can be 

Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-09 Thread Arvid Heise
Afaik the main issue is that the JDBC drivers are leaking as they usually
assume only one classloader. If you are aware of it, you can bundle it in
your jar. However, you are right - it doesn't help with OP, so it was
probably not a good idea.

On Fri, Apr 9, 2021 at 11:45 AM Maciek Próchniak  wrote:

> Hi Arvid,
>
> "You can still bundle it into your jar if you prefer it." - is it really
> the case with JDBC drivers? I think that if the driver is not on Flink main
> classpath (that is, in the lib folder) there is no way the class would be
> loaded by main classloader - regardless of parent/child classloader setting?
>
> Those settings will help if the driver is both on Flink classpath and in
> user jar - I noticed now the documentation is slightly misleading
> suggesting otherwise, isn't it?
>
>
> thanks,
>
> maciek
>
>
> On 09.04.2021 11:25, Arvid Heise wrote:
>
> Hi,
>
> What do you mean by light-weight way? Just to clarify: you copy the jar
> once in the lib folder and restart the cluster once (and put it into the
> lib/ for future clusters). Not sure how it would be more light-weight.
>
> You can still bundle it into your jar if you prefer it. It just tends to
> be big but if it's easier for you to not touch the cluster, then just put
> everything into your jar.
>
> On Fri, Apr 9, 2021 at 4:08 AM 太平洋 <495635...@qq.com> wrote:
>
>> I have tried  to add 'classloader.parent-first-patterns.additional:
>> "ru.yandex.clickhouse" ' to flink-config, but problem still exist.
>> Is there lightweight way to put clickhouse JDBC driver on Flink lib/
>> folder?
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Maciek Próchniak" ;
>> *发送时间:* 2021年4月9日(星期五) 凌晨3:24
>> *收件人:* "太平洋"<495635...@qq.com>;"Arvid Heise";"Yangze
>> Guo";
>> *抄送:* "user";"guowei.mgw"> >;"renqschn";
>> *主题:* Re: 回复: period batch job lead to OutOfMemoryError: Metaspace
>> problem
>>
>> Hi,
>>
>> Did you put the clickhouse JDBC driver on Flink main classpath (in lib
>> folder) and not in user-jar - as described here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code
>> ?
>>
>> When we encountered Metaspace leaks recently, in quite a few cases it
>> turned out that the problem was the JDBC driver in user classloder which
>> was registered by DriverManager and caused classloader leak.
>>
>>
>> maciek
>>
>>
>> On 08.04.2021 11:42, 太平洋 wrote:
>>
>> My application program looks like this. Does this structure has some
>> problem?
>>
>> public class StreamingJob {
>> public static void main(String[] args) throws Exception {
>> int i = 0;
>> while (i < 100) {
>> try {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>> env.setParallelism(Parallelism);
>>
>> EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner()
>> .inStreamingMode().build();
>> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env,
>> bsSettings);
>>
>> bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
>> Table t = bsTableEnv.sqlQuery(query);
>>
>> DataStream points = bsTableEnv.toAppendStream(t,
>> DataPoint.class);
>>
>> DataStream weightPoints = points.map();
>>
>> DataStream predictPoints = weightPoints.keyBy()
>> .reduce().map();
>>
>> // side output
>> final OutputTag outPutPredict = new
>> OutputTag("predict") {
>> };
>>
>> SingleOutputStreamOperator mainDataStream = predictPoints
>> .process();
>>
>> DataStream exStream =
>> mainDataStream.getSideOutput(outPutPredict);
>>
>> //write data to clickhouse
>> String insertIntoCKSql = "xxx";
>> mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new CkSinkBuilder(),
>> new JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
>> new
>> JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)
>> .withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));
>>
>> // write data to kafka
>> FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
>> exStream.map().addSink(producer);
>>
>> env.execute("Prediction P

Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-09 Thread Maciek Próchniak

Hi Arvid,

"You can still bundle it into your jar if you prefer it." - is it really 
the case with JDBC drivers? I think that if the driver is not on Flink 
main classpath (that is, in the lib folder) there is no way the class 
would be loaded by main classloader - regardless of parent/child 
classloader setting?


Those settings will help if the driver is both on Flink classpath and in 
user jar - I noticed now the documentation is slightly misleading 
suggesting otherwise, isn't it?



thanks,

maciek


On 09.04.2021 11:25, Arvid Heise wrote:

Hi,

What do you mean by light-weight way? Just to clarify: you copy the 
jar once in the lib folder and restart the cluster once (and put it 
into the lib/ for future clusters). Not sure how it would be more 
light-weight.


You can still bundle it into your jar if you prefer it. It just tends 
to be big but if it's easier for you to not touch the cluster, then 
just put everything into your jar.


On Fri, Apr 9, 2021 at 4:08 AM 太平洋 <495635...@qq.com 
<mailto:495635...@qq.com>> wrote:


I have tried  to add
'classloader.parent-first-patterns.additional:
"ru.yandex.clickhouse" ' to flink-config, but problem still exist.
Is there lightweight way to put clickhouse JDBC driver on Flink
lib/ folder?

-- 原始邮件 --
*发件人:* "Maciek Próchniak" mailto:m...@touk.pl>>;
*发送时间:* 2021年4月9日(星期五) 凌晨3:24
*收件人:* "太平洋"<495635...@qq.com <mailto:495635...@qq.com>>;"Arvid
Heise"mailto:ar...@apache.org>>;"Yangze
Guo"mailto:karma...@gmail.com>>;
*抄送:* "user"mailto:user@flink.apache.org>>;"guowei.mgw"mailto:guowei@gmail.com>>;"renqschn"mailto:renqs...@gmail.com>>;
*主题:* Re: 回复: period batch job lead to OutOfMemoryError:
Metaspace problem

Hi,

Did you put the clickhouse JDBC driver on Flink main classpath (in
lib folder) and not in user-jar - as described here:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code

<https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code>?

When we encountered Metaspace leaks recently, in quite a few cases
it turned out that the problem was the JDBC driver in user
classloder which was registered by DriverManager and caused
classloader leak.


maciek


On 08.04.2021 11:42, 太平洋 wrote:

My application program looks like this. Does this structure has
some problem?

public class StreamingJob {
public static void main(String[] args) throws Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);

EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner()
.inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(env, bsSettings);

bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
Table t = bsTableEnv.sqlQuery(query);

DataStream points = bsTableEnv.toAppendStream(t,
DataPoint.class);

DataStream weightPoints = points.map();

DataStream predictPoints = weightPoints.keyBy()
.reduce().map();

// side output
final OutputTag outPutPredict = new
OutputTag("predict") {
};

SingleOutputStreamOperator mainDataStream =
predictPoints
.process();

DataStream exStream =
mainDataStream.getSideOutput(outPutPredict);

                                        //write data to clickhouse
String insertIntoCKSql = "xxx";
mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new
CkSinkBuilder(),
new
JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
new

JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)
.withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));

// write data to kafka
FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
exStream.map().addSink(producer);

env.execute("Prediction Program");
} catch (Exception e) {
e.printStackTrace();
}
i++;
Thread.sleep(window * 1000);
}
}
}



-- 原始邮件 --
*发件人:* "Arvid Heise"  <mailto:ar...@apache.org>;
*发送时间:* 2021年4月8日(星期四) 下午2:33
*收件人:* "Yangze Guo"
<mailto:karma...@gmail.com>;
*抄送:* "太平洋"<495635...@qq.com>
<mailto:495635...@qq.com>;"user"
<mailto:user@flink.

Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-09 Thread Arvid Heise
Hi,

What do you mean by light-weight way? Just to clarify: you copy the jar
once in the lib folder and restart the cluster once (and put it into the
lib/ for future clusters). Not sure how it would be more light-weight.

You can still bundle it into your jar if you prefer it. It just tends to be
big but if it's easier for you to not touch the cluster, then just put
everything into your jar.

On Fri, Apr 9, 2021 at 4:08 AM 太平洋 <495635...@qq.com> wrote:

> I have tried  to add 'classloader.parent-first-patterns.additional:
> "ru.yandex.clickhouse" ' to flink-config, but problem still exist.
> Is there lightweight way to put clickhouse JDBC driver on Flink lib/
> folder?
>
>
> -- 原始邮件 --
> *发件人:* "Maciek Próchniak" ;
> *发送时间:* 2021年4月9日(星期五) 凌晨3:24
> *收件人:* "太平洋"<495635...@qq.com>;"Arvid Heise";"Yangze
> Guo";
> *抄送:* "user";"guowei.mgw" >;"renqschn";
> *主题:* Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem
>
> Hi,
>
> Did you put the clickhouse JDBC driver on Flink main classpath (in lib
> folder) and not in user-jar - as described here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code
> ?
>
> When we encountered Metaspace leaks recently, in quite a few cases it
> turned out that the problem was the JDBC driver in user classloder which
> was registered by DriverManager and caused classloader leak.
>
>
> maciek
>
>
> On 08.04.2021 11:42, 太平洋 wrote:
>
> My application program looks like this. Does this structure has some
> problem?
>
> public class StreamingJob {
> public static void main(String[] args) throws Exception {
> int i = 0;
> while (i < 100) {
> try {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.setParallelism(Parallelism);
>
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner()
> .inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env,
> bsSettings);
>
> bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
> Table t = bsTableEnv.sqlQuery(query);
>
> DataStream points = bsTableEnv.toAppendStream(t,
> DataPoint.class);
>
> DataStream weightPoints = points.map();
>
> DataStream predictPoints = weightPoints.keyBy()
> .reduce().map();
>
> // side output
> final OutputTag outPutPredict = new
> OutputTag("predict") {
> };
>
> SingleOutputStreamOperator mainDataStream = predictPoints
> .process();
>
> DataStream exStream =
> mainDataStream.getSideOutput(outPutPredict);
>
> //write data to clickhouse
> String insertIntoCKSql = "xxx";
> mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new CkSinkBuilder(),
> new JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
> new
> JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)
> .withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));
>
> // write data to kafka
> FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
> exStream.map().addSink(producer);
>
> env.execute("Prediction Program");
> } catch (Exception e) {
> e.printStackTrace();
> }
> i++;
> Thread.sleep(window * 1000);
> }
> }
> }
>
>
>
> -- 原始邮件 --
> *发件人:* "Arvid Heise"  ;
> *发送时间:* 2021年4月8日(星期四) 下午2:33
> *收件人:* "Yangze Guo" ;
> *抄送:* "太平洋"<495635...@qq.com> <495635...@qq.com>;"user"
>  ;"guowei.mgw"
>  ;"renqschn"
>  ;
> *主题:* Re: period batch job lead to OutOfMemoryError: Metaspace problem
>
> Hi,
>
> ChildFirstClassLoader are created (more or less) by application jar and
> seeing so many looks like a classloader leak to me. I'd expect you to see a
> new ChildFirstClassLoader popping up with each new job submission.
>
> Can you check who is referencing the ChildFirstClassLoader transitively?
> Usually, it's some thread that is lingering around because some third party
> library is leaking threads etc.
>
> OneInputStreamTask is legit and just indicates that you have a job running
> with 4 slots on that TM. It should not hold any dedicated metaspace memory.
>
> On Thu, Apr 8, 2021 at 4:52 AM Yangze Guo  wrote:
>
>> I went through the JM & TM logs but could not find any valuable clue.
>> The exception is actually 

?????? ?????? period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread ??????
I have tried  to add 'classloader.parent-first-patterns.additional: 
"ru.yandex.clickhouse" ' to flink-config, but problem still exist.
Is there lightweight way to put clickhouse JDBC driver on Flink lib/ folder?
 


--  --
??: 
   "Maciek Pr??chniak"  
  
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code?
 
When we encountered Metaspace leaks recently, in quite a few   cases it 
turned out that the problem was the JDBC driver in user   classloder which 
was registered by DriverManager and caused   classloader leak.
 

 
 
maciek
 
 

 

 
 On 08.04.2021 11:42, ?? wrote:
 
My application program looks like this. Does this 
structure   has some problem?
 
 
 public class StreamingJob {
public static void   main(String[] args) throws 
Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment  
 env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);
 
 
EnvironmentSettings   
bsSettings =   EnvironmentSettings.newInstance().useBlinkPlanner()

.inStreamingMode().build();
StreamTableEnvironment  
 bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
 
 
bsTableEnv.executeSql("CREATE   
TEMPORARY TABLE ");
Table t =   
bsTableEnv.sqlQuery(query);
 
 
DataStreamhttps://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
 > >
 > > Best,
 > > Yangze Guo
 > >
 > > Best,
 > > Yangze Guo
 > >
 > >
 > > On Tue, Apr 6, 2021 at 4:22 PM ?? 
<495635...@qq.com> wrote:
 > > >
 > > > batch job??
 > > > read data from s3 by sql??then by some     
operators and write data to clickhouse and kafka.
 > > > after some times, task-manager quit with
 OutOfMemoryError: Metaspace.
 > > >
 > > > env??
 > > > flink version??1.12.2
 > > > task-manager slot count: 5
 > > > deployment?? standalone kubernetes session 
 > > > dependencies??
 > > >
 > > >     

Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread Maciek Próchniak

Hi,

Did you put the clickhouse JDBC driver on Flink main classpath (in lib 
folder) and not in user-jar - as described here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code?


When we encountered Metaspace leaks recently, in quite a few cases it 
turned out that the problem was the JDBC driver in user classloder which 
was registered by DriverManager and caused classloader leak.



maciek


On 08.04.2021 11:42, ?? wrote:
My application program looks like this. Does this structure has some 
problem?


public class StreamingJob {
public static void main(String[] args) throws Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner()

.inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
bsSettings);


bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
Table t = bsTableEnv.sqlQuery(query);

DataStream points = bsTableEnv.toAppendStream(t, 
DataPoint.class);


DataStream weightPoints = points.map();

DataStream predictPoints = weightPoints.keyBy()
.reduce().map();

// side output
final OutputTag outPutPredict = new 
OutputTag("predict") {

};

SingleOutputStreamOperator mainDataStream = predictPoints
.process();

DataStream exStream = 
mainDataStream.getSideOutput(outPutPredict);


?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 
?0?2 ?0?2 ?0?2 ?0?2 //write data to clickhouse
String insertIntoCKSql = "xxx";
mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new CkSinkBuilder(),
new JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)

.withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));

// write data to kafka
FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
exStream.map().addSink(producer);

env.execute("Prediction Program");
} catch (Exception e) {
e.printStackTrace();
}
i++;
Thread.sleep(window * 1000);
}
}
}



--?0?2?0?2--
*??:* "Arvid Heise" ;
*:*?0?22021??4??8??(??) 2:33
*??:*?0?2"Yangze Guo";
*:*?0?2"??"<495635...@qq.com>;"user";"guowei.mgw";"renqschn";
*:*?0?2Re: period batch job lead to OutOfMemoryError: Metaspace problem

Hi,

ChildFirstClassLoader are created (more or less) by application jar 
and seeing so many looks like a classloader leak to me. I'd expect you 
to see a new ChildFirstClassLoader popping up with each new job 
submission.


Can you check who is referencing the ChildFirstClassLoader 
transitively? Usually, it's some thread that is lingering around 
because some third party library is leaking threads etc.


OneInputStreamTask is legit and just indicates that you have a job 
running with 4 slots on that TM. It should not hold any dedicated 
metaspace memory.


On Thu, Apr 8, 2021 at 4:52 AM Yangze Guo <mailto:karma...@gmail.com>> wrote:


I went through the JM & TM logs but could not find any valuable clue.
The exception is actually thrown by kafka-producer-network-thread.
Maybe @Qingsheng could also take a look?


Best,
Yangze Guo

On Thu, Apr 8, 2021 at 10:39 AM ?? <495635...@qq.com
<mailto:495635...@qq.com>> wrote:
>
> I have configured to 512M, but problem still exist. Now the
memory size is still 256M.
> Attachments are TM and JM logs.
>
> Look forward to your reply.
>
> --  --
> ??: "Yangze Guo" mailto:karma...@gmail.com>>;
> : 2021??4??6??(??) 6:35
> ??: "??"<495635...@qq.com <mailto:495635...@qq.com>>;
> : "user"mailto:user@flink.apache.org>>;"guowei.mgw"mailto:guowei@gmail.com>>;
> : Re: period batch job lead to OutOfMemoryError: Metaspace
problem
>
> > I have tried this method, but the problem still exist.
> How much memory do you configure for it?
>
> > is 21 instances of
"org.apache.flink.util.ChildFirstClassLoader" normal
> Not quite sure about it. AFAIK, each job will have a classloader.
> Multiple tasks of the same job in the same TM will share the same
> classloader. The classloader will be removed if there is no more
task
> running on the TM. Classloader without reference will be finally
> cleanup by GC. Could you sh

Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread Yangze Guo
IIUC, your program will finally generate 100 ChildFirstClassLoader in
a TM. But it should always be GC when job finished. So, as Arvid said,
you'd better check who is referencing those ChildFirstClassLoader.


Best,
Yangze Guo

On Thu, Apr 8, 2021 at 5:43 PM 太平洋 <495635...@qq.com> wrote:
>
> My application program looks like this. Does this structure has some problem?
>
> public class StreamingJob {
> public static void main(String[] args) throws Exception {
> int i = 0;
> while (i < 100) {
> try {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.setParallelism(Parallelism);
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner()
> .inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
> bsSettings);
>
> bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
> Table t = bsTableEnv.sqlQuery(query);
>
> DataStream points = bsTableEnv.toAppendStream(t, DataPoint.class);
>
> DataStream weightPoints = points.map();
>
> DataStream predictPoints = weightPoints.keyBy()
> .reduce().map();
>
> // side output
> final OutputTag outPutPredict = new 
> OutputTag("predict") {
> };
>
> SingleOutputStreamOperator mainDataStream = predictPoints
> .process();
>
> DataStream exStream = 
> mainDataStream.getSideOutput(outPutPredict);
>
> //write data to clickhouse
> String insertIntoCKSql = "xxx";
> mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new CkSinkBuilder(),
> new JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
> new 
> JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)
> .withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));
>
> // write data to kafka
> FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
> exStream.map().addSink(producer);
>
> env.execute("Prediction Program");
> } catch (Exception e) {
> e.printStackTrace();
> }
> i++;
> Thread.sleep(window * 1000);
> }
> }
> }
>
>
>
> -- 原始邮件 --
> 发件人: "Arvid Heise" ;
> 发送时间: 2021年4月8日(星期四) 下午2:33
> 收件人: "Yangze Guo";
> 抄送: 
> "太平洋"<495635...@qq.com>;"user";"guowei.mgw";"renqschn";
> 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>
> Hi,
>
> ChildFirstClassLoader are created (more or less) by application jar and 
> seeing so many looks like a classloader leak to me. I'd expect you to see a 
> new ChildFirstClassLoader popping up with each new job submission.
>
> Can you check who is referencing the ChildFirstClassLoader transitively? 
> Usually, it's some thread that is lingering around because some third party 
> library is leaking threads etc.
>
> OneInputStreamTask is legit and just indicates that you have a job running 
> with 4 slots on that TM. It should not hold any dedicated metaspace memory.
>
> On Thu, Apr 8, 2021 at 4:52 AM Yangze Guo  wrote:
>>
>> I went through the JM & TM logs but could not find any valuable clue.
>> The exception is actually thrown by kafka-producer-network-thread.
>> Maybe @Qingsheng could also take a look?
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Thu, Apr 8, 2021 at 10:39 AM 太平洋 <495635...@qq.com> wrote:
>> >
>> > I have configured to 512M, but problem still exist. Now the memory size is 
>> > still 256M.
>> > Attachments are TM and JM logs.
>> >
>> > Look forward to your reply.
>> >
>> > -- 原始邮件 --
>> > 发件人: "Yangze Guo" ;
>> > 发送时间: 2021年4月6日(星期二) 晚上6:35
>> > 收件人: "太平洋"<495635...@qq.com>;
>> > 抄送: "user";"guowei.mgw";
>> > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>> >
>> > > I have tried this method, but the problem still exist.
>> > How much memory do you configure for it?
>> >
>> > > is 21 instances of "org.apache.flink.util.ChildFirstClassLoader" normal
>> > Not quite sure about it. AFAIK, each job will have a classloader.
>> > Multiple tasks of the same job in the same TM will share the same
>> > classloader. The classloader will be removed if there is no more task
>> > running on the TM. Classloader without reference will be finally
>> > cleanup by GC. Could you share JM and TM logs for furthe

?????? period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread ??????
My application program looks like this. Does this structure has some problem?


public class StreamingJob {
public static void main(String[] args) throws Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);


EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner()

.inStreamingMode().build();
StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(env, bsSettings);


bsTableEnv.executeSql("CREATE TEMPORARY 
TABLE ");
Table t = bsTableEnv.sqlQuery(query);


DataStreamhttps://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
 > >
 > > Best,
 > > Yangze Guo
 > >
 > > Best,
 > > Yangze Guo
 > >
 > >
 > > On Tue, Apr 6, 2021 at 4:22 PM ?? <495635...@qq.com> wrote:
 > > >
 > > > batch job??
 > > > read data from s3 by sql??then by some operators and write data 
to clickhouse and kafka.
 > > > after some times, task-manager quit with OutOfMemoryError: 
Metaspace.
 > > >
 > > > env??
 > > > flink version??1.12.2
 > > > task-manager slot count: 5
 > > > deployment?? standalone kubernetes session 
 > > > dependencies??
 > > >
 > > >     

Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-07 Thread Arvid Heise
Hi,

ChildFirstClassLoader are created (more or less) by application jar and
seeing so many looks like a classloader leak to me. I'd expect you to see a
new ChildFirstClassLoader popping up with each new job submission.

Can you check who is referencing the ChildFirstClassLoader transitively?
Usually, it's some thread that is lingering around because some third party
library is leaking threads etc.

OneInputStreamTask is legit and just indicates that you have a job running
with 4 slots on that TM. It should not hold any dedicated metaspace memory.

On Thu, Apr 8, 2021 at 4:52 AM Yangze Guo  wrote:

> I went through the JM & TM logs but could not find any valuable clue.
> The exception is actually thrown by kafka-producer-network-thread.
> Maybe @Qingsheng could also take a look?
>
>
> Best,
> Yangze Guo
>
> On Thu, Apr 8, 2021 at 10:39 AM 太平洋 <495635...@qq.com> wrote:
> >
> > I have configured to 512M, but problem still exist. Now the memory size
> is still 256M.
> > Attachments are TM and JM logs.
> >
> > Look forward to your reply.
> >
> > -- 原始邮件 --
> > 发件人: "Yangze Guo" ;
> > 发送时间: 2021年4月6日(星期二) 晚上6:35
> > 收件人: "太平洋"<495635...@qq.com>;
> > 抄送: "user";"guowei.mgw";
> > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
> >
> > > I have tried this method, but the problem still exist.
> > How much memory do you configure for it?
> >
> > > is 21 instances of "org.apache.flink.util.ChildFirstClassLoader" normal
> > Not quite sure about it. AFAIK, each job will have a classloader.
> > Multiple tasks of the same job in the same TM will share the same
> > classloader. The classloader will be removed if there is no more task
> > running on the TM. Classloader without reference will be finally
> > cleanup by GC. Could you share JM and TM logs for further analysis?
> > I'll also involve @Guowei Ma in this thread.
> >
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Apr 6, 2021 at 6:05 PM 太平洋 <495635...@qq.com> wrote:
> > >
> > > I have tried this method, but the problem still exist.
> > > by heap dump analysis, is 21 instances of
> "org.apache.flink.util.ChildFirstClassLoader" normal?
> > >
> > >
> > > -- 原始邮件 --
> > > 发件人: "Yangze Guo" ;
> > > 发送时间: 2021年4月6日(星期二) 下午4:32
> > > 收件人: "太平洋"<495635...@qq.com>;
> > > 抄送: "user";
> > > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
> > >
> > > I think you can try to increase the JVM metaspace option for
> > > TaskManagers through taskmanager.memory.jvm-metaspace.size. [1]
> > >
> > > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > Best,
> > > Yangze Guo
> > >
> > >
> > > On Tue, Apr 6, 2021 at 4:22 PM 太平洋 <495635...@qq.com> wrote:
> > > >
> > > > batch job:
> > > > read data from s3 by sql,then by some operators and write data to
> clickhouse and kafka.
> > > > after some times, task-manager quit with OutOfMemoryError: Metaspace.
> > > >
> > > > env:
> > > > flink version:1.12.2
> > > > task-manager slot count: 5
> > > > deployment: standalone kubernetes session 模式
> > > > dependencies:
> > > >
> > > > 
> > > >
> > > >   org.apache.flink
> > > >
> > > >   flink-connector-kafka_2.11
> > > >
> > > >   ${flink.version}
> > > >
> > > > 
> > > >
> > > > 
> > > >
> > > >   com.google.code.gson
> > > >
> > > >   gson
> > > >
> > > >   2.8.5
> > > >
> > > > 
> > > >
> > > > 
> > > >
> > > >   org.apache.flink
> > > >
> > > >   flink-connector-jdbc_2.11
> > > >
> > > >   ${flink.version}
> > > >
> > > > 
> > > >
> > > > 
> > > >
> > > >   ru.yandex.clickhouse
> > > >
> > > >   clickhouse-jdbc
> > > >
> >

Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-07 Thread Yangze Guo
I went through the JM & TM logs but could not find any valuable clue.
The exception is actually thrown by kafka-producer-network-thread.
Maybe @Qingsheng could also take a look?


Best,
Yangze Guo

On Thu, Apr 8, 2021 at 10:39 AM 太平洋 <495635...@qq.com> wrote:
>
> I have configured to 512M, but problem still exist. Now the memory size is 
> still 256M.
> Attachments are TM and JM logs.
>
> Look forward to your reply.
>
> -- 原始邮件 --
> 发件人: "Yangze Guo" ;
> 发送时间: 2021年4月6日(星期二) 晚上6:35
> 收件人: "太平洋"<495635...@qq.com>;
> 抄送: "user";"guowei.mgw";
> 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>
> > I have tried this method, but the problem still exist.
> How much memory do you configure for it?
>
> > is 21 instances of "org.apache.flink.util.ChildFirstClassLoader" normal
> Not quite sure about it. AFAIK, each job will have a classloader.
> Multiple tasks of the same job in the same TM will share the same
> classloader. The classloader will be removed if there is no more task
> running on the TM. Classloader without reference will be finally
> cleanup by GC. Could you share JM and TM logs for further analysis?
> I'll also involve @Guowei Ma in this thread.
>
>
> Best,
> Yangze Guo
>
> On Tue, Apr 6, 2021 at 6:05 PM 太平洋 <495635...@qq.com> wrote:
> >
> > I have tried this method, but the problem still exist.
> > by heap dump analysis, is 21 instances of 
> > "org.apache.flink.util.ChildFirstClassLoader" normal?
> >
> >
> > -- 原始邮件 --
> > 发件人: "Yangze Guo" ;
> > 发送时间: 2021年4月6日(星期二) 下午4:32
> > 收件人: "太平洋"<495635...@qq.com>;
> > 抄送: "user";
> > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
> >
> > I think you can try to increase the JVM metaspace option for
> > TaskManagers through taskmanager.memory.jvm-metaspace.size. [1]
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
> >
> > Best,
> > Yangze Guo
> >
> > Best,
> > Yangze Guo
> >
> >
> > On Tue, Apr 6, 2021 at 4:22 PM 太平洋 <495635...@qq.com> wrote:
> > >
> > > batch job:
> > > read data from s3 by sql,then by some operators and write data to 
> > > clickhouse and kafka.
> > > after some times, task-manager quit with OutOfMemoryError: Metaspace.
> > >
> > > env:
> > > flink version:1.12.2
> > > task-manager slot count: 5
> > > deployment: standalone kubernetes session 模式
> > > dependencies:
> > >
> > > 
> > >
> > >   org.apache.flink
> > >
> > >   flink-connector-kafka_2.11
> > >
> > >   ${flink.version}
> > >
> > > 
> > >
> > > 
> > >
> > >   com.google.code.gson
> > >
> > >   gson
> > >
> > >   2.8.5
> > >
> > > 
> > >
> > > 
> > >
> > >   org.apache.flink
> > >
> > >   flink-connector-jdbc_2.11
> > >
> > >   ${flink.version}
> > >
> > > 
> > >
> > > 
> > >
> > >   ru.yandex.clickhouse
> > >
> > >   clickhouse-jdbc
> > >
> > >   0.3.0
> > >
> > > 
> > >
> > > 
> > >
> > >   org.apache.flink
> > >
> > > flink-parquet_2.11
> > >
> > > ${flink.version}
> > >
> > > 
> > >
> > > 
> > >
> > >  org.apache.flink
> > >
> > >  flink-json
> > >
> > >  ${flink.version}
> > >
> > > 
> > >
> > >
> > > heap dump1:
> > >
> > > Leak Suspects
> > >
> > > System Overview
> > >
> > >  Leaks
> > >
> > >  Overview
> > >
> > >
> > >   Problem Suspect 1
> > >
> > > 21 instances of "org.apache.flink.util.ChildFirstClassLoader", loaded by 
> > > "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 29,656,880 
> > > (41.16%) bytes.
> > >
> > > Biggest instances:
> > >
> > > org.apache.flink.util.

Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-06 Thread Yangze Guo
> I have tried this method, but the problem still exist.
How much memory do you configure for it?

> is 21 instances of "org.apache.flink.util.ChildFirstClassLoader" normal
Not quite sure about it. AFAIK, each job will have a classloader.
Multiple tasks of the same job in the same TM will share the same
classloader. The classloader will be removed if there is no more task
running on the TM. Classloader without reference will be finally
cleanup by GC. Could you share JM and TM logs for further analysis?
I'll also involve @Guowei Ma in this thread.


Best,
Yangze Guo

On Tue, Apr 6, 2021 at 6:05 PM 太平洋 <495635...@qq.com> wrote:
>
> I have tried this method, but the problem still exist.
> by heap dump analysis, is 21 instances of 
> "org.apache.flink.util.ChildFirstClassLoader" normal?
>
>
> -- 原始邮件 --
> 发件人: "Yangze Guo" ;
> 发送时间: 2021年4月6日(星期二) 下午4:32
> 收件人: "太平洋"<495635...@qq.com>;
> 抄送: "user";
> 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>
> I think you can try to increase the JVM metaspace option for
> TaskManagers through taskmanager.memory.jvm-metaspace.size. [1]
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Tue, Apr 6, 2021 at 4:22 PM 太平洋 <495635...@qq.com> wrote:
> >
> > batch job:
> > read data from s3 by sql,then by some operators and write data to 
> > clickhouse and kafka.
> > after some times, task-manager quit with OutOfMemoryError: Metaspace.
> >
> > env:
> > flink version:1.12.2
> > task-manager slot count: 5
> > deployment: standalone kubernetes session 模式
> > dependencies:
> >
> > 
> >
> >   org.apache.flink
> >
> >   flink-connector-kafka_2.11
> >
> >   ${flink.version}
> >
> > 
> >
> > 
> >
> >   com.google.code.gson
> >
> >   gson
> >
> >   2.8.5
> >
> > 
> >
> > 
> >
> >   org.apache.flink
> >
> >   flink-connector-jdbc_2.11
> >
> >   ${flink.version}
> >
> > 
> >
> > 
> >
> >   ru.yandex.clickhouse
> >
> >   clickhouse-jdbc
> >
> >   0.3.0
> >
> > 
> >
> > 
> >
> >   org.apache.flink
> >
> > flink-parquet_2.11
> >
> > ${flink.version}
> >
> > 
> >
> > 
> >
> >  org.apache.flink
> >
> >  flink-json
> >
> >  ${flink.version}
> >
> > 
> >
> >
> > heap dump1:
> >
> > Leak Suspects
> >
> > System Overview
> >
> >  Leaks
> >
> >  Overview
> >
> >
> >   Problem Suspect 1
> >
> > 21 instances of "org.apache.flink.util.ChildFirstClassLoader", loaded by 
> > "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 29,656,880 (41.16%) 
> > bytes.
> >
> > Biggest instances:
> >
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73ca2a1e8 - 1,474,760 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d2af820 - 1,474,168 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73cdcaa10 - 1,474,160 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73cf6aab0 - 1,474,160 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73dd8 - 1,474,160 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d2bb108 - 1,474,128 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73de202e0 - 1,474,120 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73dadc778 - 1,474,112 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d5f70e8 - 1,474,064 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d93aa38 - 1,474,064 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73e179638 - 1,474,064 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73dc80418 - 1,474,056 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73dfcda60 - 1,474,056 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73e4bcd38 - 1,474,056 
> > (2.05%) bytes.
> > 

回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-06 Thread 太平洋
I have tried this method, but the problem still exist.
by heap dump analysis, is 21 instances of 
"org.apache.flink.util.ChildFirstClassLoader" normal?




-- 原始邮件 --
发件人:
"Yangze Guo"

https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Apr 6, 2021 at 4:22 PM 太平洋 <495635...@qq.com> wrote:
>
> batch job:
> read data from s3 by sql,then by some operators and write data to 
clickhouse and kafka.
> after some times, task-manager quit with OutOfMemoryError: Metaspace.
>
> env:
> flink version:1.12.2
> task-manager slot count: 5
> deployment: standalone kubernetes session 模式
> dependencies:
>
> 

Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-06 Thread Yangze Guo
I think you can try to increase the JVM metaspace option for
TaskManagers through taskmanager.memory.jvm-metaspace.size. [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Apr 6, 2021 at 4:22 PM 太平洋 <495635...@qq.com> wrote:
>
> batch job:
> read data from s3 by sql,then by some operators and write data to clickhouse 
> and kafka.
> after some times, task-manager quit with OutOfMemoryError: Metaspace.
>
> env:
> flink version:1.12.2
> task-manager slot count: 5
> deployment: standalone kubernetes session 模式
> dependencies:
>
> 
>
>   org.apache.flink
>
>   flink-connector-kafka_2.11
>
>   ${flink.version}
>
> 
>
> 
>
>   com.google.code.gson
>
>   gson
>
>   2.8.5
>
> 
>
> 
>
>   org.apache.flink
>
>   flink-connector-jdbc_2.11
>
>   ${flink.version}
>
> 
>
> 
>
>   ru.yandex.clickhouse
>
>   clickhouse-jdbc
>
>   0.3.0
>
> 
>
> 
>
>   org.apache.flink
>
> flink-parquet_2.11
>
> ${flink.version}
>
> 
>
> 
>
>  org.apache.flink
>
>  flink-json
>
>  ${flink.version}
>
> 
>
>
> heap dump1:
>
> Leak Suspects
>
> System Overview
>
>  Leaks
>
>  Overview
>
>
>   Problem Suspect 1
>
> 21 instances of "org.apache.flink.util.ChildFirstClassLoader", loaded by 
> "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 29,656,880 (41.16%) 
> bytes.
>
> Biggest instances:
>
> org.apache.flink.util.ChildFirstClassLoader @ 0x73ca2a1e8 - 1,474,760 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d2af820 - 1,474,168 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73cdcaa10 - 1,474,160 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73cf6aab0 - 1,474,160 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73dd8 - 1,474,160 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d2bb108 - 1,474,128 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73de202e0 - 1,474,120 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73dadc778 - 1,474,112 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d5f70e8 - 1,474,064 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d93aa38 - 1,474,064 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e179638 - 1,474,064 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73dc80418 - 1,474,056 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73dfcda60 - 1,474,056 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e4bcd38 - 1,474,056 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d6006e8 - 1,474,032 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73c7d2ad8 - 1,461,944 (2.03%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73ca1bb98 - 1,460,752 (2.03%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73bf203f0 - 1,460,744 (2.03%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e3284a8 - 1,445,232 (2.01%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e65de00 - 1,445,232 (2.01%) 
> bytes.
>
>
>
> Keywords
> org.apache.flink.util.ChildFirstClassLoader
> sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0
> Details »
>
>   Problem Suspect 2
>
> 34,407 instances of "org.apache.flink.core.memory.HybridMemorySegment", 
> loaded by "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 7,707,168 
> (10.70%) bytes.
>
> Keywords
> org.apache.flink.core.memory.HybridMemorySegment
> sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0
>
> Details »
>
>
>
> heap dump2:
>
> Leak Suspects
>
> System Overview
>
>  Leaks
>
>  Overview
>
>   Problem Suspect 1
>
> 21 instances of "org.apache.flink.util.ChildFirstClassLoader", loaded by 
> "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 26,061,408 (30.68%) 
> bytes.
>
> Biggest instances:
>
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e9e9930 - 1,474,224 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73edce0b8 - 1,474,224 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73f1ad7d0 - 1,474,168 (1.74%) 
> bytes.
> org.apache.flink.util.C

period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-06 Thread ??????
batch job??
read data from s3 by sql??then by some operators and write data to clickhouse 
and kafka.
after some times, task-manager quit with OutOfMemoryError: Metaspace.


env??
flink version??1.12.2
task-manager slot count: 5
deployment?? standalone kubernetes session 
dependencies??

    

Re: OutOfMemoryError while doing join operation in flink

2018-11-28 Thread Fabian Hueske
Hi,

Flink handles large data volumes quite well, large records are a bit more
tricky to tune.
You could try to reduce the number of parallel tasks per machine (#slots
per TM, #TMs per machine) and/or increase the amount of available JVM
memory (possible in exchange for managed memory as Zhijiang suggested).

Best, Fabian

Am Mi., 28. Nov. 2018 um 07:44 Uhr schrieb Akshay Mendole <
akshaymend...@gmail.com>:

> Hi Zhijiang,
>   Thanks for the explanation and the workaround suggested.
> While this can work for the example stated above, we have more complex use
> cases where we would have to re-tune the above parameters. FYI, we ran into
> same problems when we did a simple groupBy on the skewed dataset.
> Thanks,
> Akshay
>
>
> On Fri, Nov 23, 2018 at 8:29 AM zhijiang 
> wrote:
>
>> Hi Akshay,
>>
>> Sorrry I have not thought of a proper way to handle single large record
>> in distributed task managers in flink. But I can give some hints for
>> adjusting the related memories for work around OOM issue.
>> Large fraction of memories in task manager are managed by flink for
>> efficiency, and these memories are long live persistent in JVM not recycled
>> by gc. You can check the parameter "taskmanager.memory.fraction" for this
>> and the default value is 0.7 if you have not changed, that means 7GB * 0.7
>> are used by framework.
>>
>> I am not sure what is the flink version you used. If I rememberd
>> correctly, before release-1.5 the network buffers also uses heap memories
>> by default, so you should also minus this part of memory from total task
>> manager memory.
>>
>> If not considering network buffer used by framework, you only leave 7GB *
>> 0.3 temporaray memories for other parts. The temporaray memories in
>> serializer will exceed twice as current size every time if not covering the
>> record size, that means one serializer may need 2GB overhead memories for
>> your 1GB record. You have 2 slots per task manager for running two tasks,
>> so the total overhead memories may need 4GB almost. So you can decrease
>> the "taskmanager.memory.fraction" in low fraction or increase the total
>> task manager to cover this overhead memories, or set one slot for each task
>> manager.
>>
>> Best,
>> Zhijiang
>>
>> --
>> 发件人:Akshay Mendole 
>> 发送时间:2018年11月23日(星期五) 02:54
>> 收件人:trohrmann 
>> 抄 送:zhijiang ; user ;
>> Shreesha Madogaran 
>> 主 题:Re: OutOfMemoryError while doing join operation in flink
>>
>> Hi,
>> Thanks for your reply. I tried running a simple "group by" on just
>> one dataset where few keys are repeatedly occurring (in order of millions)
>> and did not include any joins. I wanted to see if this issue is specific to
>> join. But as I was expecting, I ran into the same issue. I am giving 7GBs
>> to each task manager with 2 slots per task manager. From what I understood
>> so far, such cases where individual records somewhere in the pipeline
>> become so large that they should be handled in distributed manner instead
>> of handling them by a simple data structure in single JVM. I am guessing
>> there is no way to do this in Flink today.
>> Could you please confirm this?
>> Thanks,
>> Akshay
>>
>>
>> On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann 
>> wrote:
>> Hi Akshay,
>>
>> Flink currently does not support to automatically distribute hot keys
>> across different JVMs. What you can do is to adapt the parallelism/number
>> of partitions manually if you encounter that one partition contains a lot
>> of hot keys. This might mitigate the problem by partitioning the hot keys
>> into different partitions.
>>
>> Apart from that, the problem seems to be as Zhijiang indicated that your
>> join result is quite large. One record is 1 GB large. Try to decrease it or
>> give more memory to your TMs.
>>
>> Cheers,
>> Till
>>
>> On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole 
>> wrote:
>> Hi Zhijiang,
>>  Thanks for the quick reply. My concern is more towards
>> how flink perform joins of two *skewed *datasets. Pig
>> <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark
>> <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the
>> join of skewed datasets. The record size that you are mentioning about in
>> your reply is after join operation takes place which is definitely going to
>> be huge enough not to fit in jvm task manager task slot in my use case. W

Re: OutOfMemoryError while doing join operation in flink

2018-11-27 Thread Akshay Mendole
Hi Zhijiang,
  Thanks for the explanation and the workaround suggested.
While this can work for the example stated above, we have more complex use
cases where we would have to re-tune the above parameters. FYI, we ran into
same problems when we did a simple groupBy on the skewed dataset.
Thanks,
Akshay


On Fri, Nov 23, 2018 at 8:29 AM zhijiang  wrote:

> Hi Akshay,
>
> Sorrry I have not thought of a proper way to handle single large record in
> distributed task managers in flink. But I can give some hints for adjusting
> the related memories for work around OOM issue.
> Large fraction of memories in task manager are managed by flink for
> efficiency, and these memories are long live persistent in JVM not recycled
> by gc. You can check the parameter "taskmanager.memory.fraction" for this
> and the default value is 0.7 if you have not changed, that means 7GB * 0.7
> are used by framework.
>
> I am not sure what is the flink version you used. If I rememberd
> correctly, before release-1.5 the network buffers also uses heap memories
> by default, so you should also minus this part of memory from total task
> manager memory.
>
> If not considering network buffer used by framework, you only leave 7GB *
> 0.3 temporaray memories for other parts. The temporaray memories in
> serializer will exceed twice as current size every time if not covering the
> record size, that means one serializer may need 2GB overhead memories for
> your 1GB record. You have 2 slots per task manager for running two tasks,
> so the total overhead memories may need 4GB almost. So you can decrease
> the "taskmanager.memory.fraction" in low fraction or increase the total
> task manager to cover this overhead memories, or set one slot for each task
> manager.
>
> Best,
> Zhijiang
>
> --
> 发件人:Akshay Mendole 
> 发送时间:2018年11月23日(星期五) 02:54
> 收件人:trohrmann 
> 抄 送:zhijiang ; user ;
> Shreesha Madogaran 
> 主 题:Re: OutOfMemoryError while doing join operation in flink
>
> Hi,
> Thanks for your reply. I tried running a simple "group by" on just one
> dataset where few keys are repeatedly occurring (in order of millions)  and
> did not include any joins. I wanted to see if this issue is specific to
> join. But as I was expecting, I ran into the same issue. I am giving 7GBs
> to each task manager with 2 slots per task manager. From what I understood
> so far, such cases where individual records somewhere in the pipeline
> become so large that they should be handled in distributed manner instead
> of handling them by a simple data structure in single JVM. I am guessing
> there is no way to do this in Flink today.
> Could you please confirm this?
> Thanks,
> Akshay
>
>
> On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann 
> wrote:
> Hi Akshay,
>
> Flink currently does not support to automatically distribute hot keys
> across different JVMs. What you can do is to adapt the parallelism/number
> of partitions manually if you encounter that one partition contains a lot
> of hot keys. This might mitigate the problem by partitioning the hot keys
> into different partitions.
>
> Apart from that, the problem seems to be as Zhijiang indicated that your
> join result is quite large. One record is 1 GB large. Try to decrease it or
> give more memory to your TMs.
>
> Cheers,
> Till
>
> On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole 
> wrote:
> Hi Zhijiang,
>  Thanks for the quick reply. My concern is more towards
> how flink perform joins of two *skewed *datasets. Pig
> <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark
> <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the join
> of skewed datasets. The record size that you are mentioning about in your
> reply is after join operation takes place which is definitely going to be
> huge enough not to fit in jvm task manager task slot in my use case. We
> want to know if there is a way in flink to handle such skewed keys by
> distributing their values across different jvms. Let me know if you need
> more clarity on the issue.
> Thanks,
> Akshay
>
> On Thu, Nov 22, 2018 at 2:38 PM zhijiang 
> wrote:
> Hi Akshay,
>
> You encountered an existing issue for serializing large records to cause
> OOM.
>
> Every subpartition would create a separate serializer before, and each
> serializer would maintain an internal bytes array for storing intermediate
> serialization results. The key point is that these overhead internal bytes
> array are not managed by framework, and their size would exceed with the
> record size dynamically. If your job has many subpartitions with large
> re

Re: OutOfMemoryError while doing join operation in flink

2018-11-23 Thread Ken Krugler
Hi Akshay,

I don’t know much about the Beam/Flink integration, but I’m curious why you 
have a single record that would contain all 8M records with the same key.

E.g. the code for your simple “group by” test would be interesting.

— Ken


> On Nov 22, 2018, at 10:54 AM, Akshay Mendole  wrote:
> 
> Hi,
> Thanks for your reply. I tried running a simple "group by" on just one 
> dataset where few keys are repeatedly occurring (in order of millions)  and 
> did not include any joins. I wanted to see if this issue is specific to join. 
> But as I was expecting, I ran into the same issue. I am giving 7GBs to each 
> task manager with 2 slots per task manager. From what I understood so far, 
> such cases where individual records somewhere in the pipeline become so large 
> that they should be handled in distributed manner instead of handling them by 
> a simple data structure in single JVM. I am guessing there is no way to do 
> this in Flink today. 
> Could you please confirm this?
> Thanks,
> Akshay
> 
> 
> On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann  <mailto:trohrm...@apache.org>> wrote:
> Hi Akshay,
> 
> Flink currently does not support to automatically distribute hot keys across 
> different JVMs. What you can do is to adapt the parallelism/number of 
> partitions manually if you encounter that one partition contains a lot of hot 
> keys. This might mitigate the problem by partitioning the hot keys into 
> different partitions.
> 
> Apart from that, the problem seems to be as Zhijiang indicated that your join 
> result is quite large. One record is 1 GB large. Try to decrease it or give 
> more memory to your TMs.
> 
> Cheers,
> Till
> 
> On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole  <mailto:akshaymend...@gmail.com>> wrote:
> Hi Zhijiang,
>  Thanks for the quick reply. My concern is more towards how 
> flink perform joins of two skewed datasets. Pig 
> <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark 
> <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the join of 
> skewed datasets. The record size that you are mentioning about in your reply 
> is after join operation takes place which is definitely going to be huge 
> enough not to fit in jvm task manager task slot in my use case. We want to 
> know if there is a way in flink to handle such skewed keys by distributing 
> their values across different jvms. Let me know if you need more clarity on 
> the issue.
> Thanks, 
> Akshay 
> 
> On Thu, Nov 22, 2018 at 2:38 PM zhijiang  <mailto:wangzhijiang...@aliyun.com>> wrote:
> Hi Akshay,
> 
> You encountered an existing issue for serializing large records to cause OOM.
> 
> Every subpartition would create a separate serializer before, and each 
> serializer would maintain an internal bytes array for storing intermediate 
> serialization results. The key point is that these overhead internal bytes 
> array are not managed by framework, and their size would exceed with the 
> record size dynamically. If your job has many subpartitions with large 
> records, it may probably cause OOM issue.
> 
> I already improved this issue to some extent by sharing only one serializer 
> for all subpartitions [1], that means we only have one bytes array overhead 
> at most. This issue is covered in release-1.7.
> Currently the best option may reduce your record size if possible or you can 
> increase the heap size of task manager container.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-9913 
> <https://issues.apache.org/jira/browse/FLINK-9913>
> 
> Best,
> Zhijiang
> --
> 发件人:Akshay Mendole mailto:akshaymend...@gmail.com>>
> 发送时间:2018年11月22日(星期四) 13:43
> 收件人:user mailto:user@flink.apache.org>>
> 主 题:OutOfMemoryError while doing join operation in flink
> 
> Hi,
> We are converting one of our pig pipelines to flink using apache beam. 
> The pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches 
> them, joins them and dumps back to hdfs. The data set R1 is skewed. In a 
> sense, it has few keys with lot of records. When we converted the pig 
> pipeline to apache beam and ran it using flink on a production yarn cluster, 
> we got the following error 
> 
> 2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask
>   - Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) 
> (25/100)
> java.lang.RuntimeException: Emitting the record caused an I/O exception: 
> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM 
> heap space
> at 
> org.apa

回复:OutOfMemoryError while doing join operation in flink

2018-11-22 Thread zhijiang
Hi Akshay,

Sorrry I have not thought of a proper way to handle single large record in 
distributed task managers in flink. But I can give some hints for adjusting the 
related memories for work around OOM issue.
Large fraction of memories in task manager are managed by flink for efficiency, 
and these memories are long live persistent in JVM not recycled by gc. You can 
check the parameter "taskmanager.memory.fraction" for this and the default 
value is 0.7 if you have not changed, that means 7GB * 0.7 are used by 
framework.

I am not sure what is the flink version you used. If I rememberd correctly, 
before release-1.5 the network buffers also uses heap memories by default, so 
you should also minus this part of memory from total task manager memory.

If not considering network buffer used by framework, you only leave 7GB * 0.3 
temporaray memories for other parts. The temporaray memories in serializer will 
exceed twice as current size every time if not covering the record size, that 
means one serializer may need 2GB overhead memories for your 1GB record. You 
have 2 slots per task manager for running two tasks, so the total overhead 
memories may need 4GB almost. So you can decrease the 
"taskmanager.memory.fraction" in low fraction or increase the total task 
manager to cover this overhead memories, or set one slot for each task manager. 

Best,
Zhijiang


--
发件人:Akshay Mendole 
发送时间:2018年11月23日(星期五) 02:54
收件人:trohrmann 
抄 送:zhijiang ; user ; 
Shreesha Madogaran 
主 题:Re: OutOfMemoryError while doing join operation in flink

Hi,
Thanks for your reply. I tried running a simple "group by" on just one 
dataset where few keys are repeatedly occurring (in order of millions)  and did 
not include any joins. I wanted to see if this issue is specific to join. But 
as I was expecting, I ran into the same issue. I am giving 7GBs to each task 
manager with 2 slots per task manager. From what I understood so far, such 
cases where individual records somewhere in the pipeline become so large that 
they should be handled in distributed manner instead of handling them by a 
simple data structure in single JVM. I am guessing there is no way to do this 
in Flink today. 
Could you please confirm this?
Thanks,
Akshay


On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann  wrote:
Hi Akshay,

Flink currently does not support to automatically distribute hot keys across 
different JVMs. What you can do is to adapt the parallelism/number of 
partitions manually if you encounter that one partition contains a lot of hot 
keys. This might mitigate the problem by partitioning the hot keys into 
different partitions.

Apart from that, the problem seems to be as Zhijiang indicated that your join 
result is quite large. One record is 1 GB large. Try to decrease it or give 
more memory to your TMs.

Cheers,
Till
On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole  wrote:
Hi Zhijiang,
 Thanks for the quick reply. My concern is more towards how 
flink perform joins of two skewed datasets. Pig and spark seems to support the 
join of skewed datasets. The record size that you are mentioning about in your 
reply is after join operation takes place which is definitely going to be huge 
enough not to fit in jvm task manager task slot in my use case. We want to know 
if there is a way in flink to handle such skewed keys by distributing their 
values across different jvms. Let me know if you need more clarity on the issue.
Thanks, 
Akshay 
On Thu, Nov 22, 2018 at 2:38 PM zhijiang  wrote:
Hi Akshay,

You encountered an existing issue for serializing large records to cause OOM.

Every subpartition would create a separate serializer before, and each 
serializer would maintain an internal bytes array for storing intermediate 
serialization results. The key point is that these overhead internal bytes 
array are not managed by framework, and their size would exceed with the record 
size dynamically. If your job has many subpartitions with large records, it may 
probably cause OOM issue.

I already improved this issue to some extent by sharing only one serializer for 
all subpartitions [1], that means we only have one bytes array overhead at 
most. This issue is covered in release-1.7.
Currently the best option may reduce your record size if possible or you can 
increase the heap size of task manager container.

[1] https://issues.apache.org/jira/browse/FLINK-9913

Best,
Zhijiang
--
发件人:Akshay Mendole 
发送时间:2018年11月22日(星期四) 13:43
收件人:user 
主 题:OutOfMemoryError while doing join operation in flink

Hi,
We are converting one of our pig pipelines to flink using apache beam. The 
pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches them, 
joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it 
has few keys with lot of records. When we co

Re: OutOfMemoryError while doing join operation in flink

2018-11-22 Thread Akshay Mendole
Hi,
Thanks for your reply. I tried running a simple "group by" on just one
dataset where few keys are repeatedly occurring (in order of millions)  and
did not include any joins. I wanted to see if this issue is specific to
join. But as I was expecting, I ran into the same issue. I am giving 7GBs
to each task manager with 2 slots per task manager. From what I understood
so far, such cases where individual records somewhere in the pipeline
become so large that they should be handled in distributed manner instead
of handling them by a simple data structure in single JVM. I am guessing
there is no way to do this in Flink today.
Could you please confirm this?
Thanks,
Akshay


On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann  wrote:

> Hi Akshay,
>
> Flink currently does not support to automatically distribute hot keys
> across different JVMs. What you can do is to adapt the parallelism/number
> of partitions manually if you encounter that one partition contains a lot
> of hot keys. This might mitigate the problem by partitioning the hot keys
> into different partitions.
>
> Apart from that, the problem seems to be as Zhijiang indicated that your
> join result is quite large. One record is 1 GB large. Try to decrease it or
> give more memory to your TMs.
>
> Cheers,
> Till
>
> On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole 
> wrote:
>
>> Hi Zhijiang,
>>  Thanks for the quick reply. My concern is more towards
>> how flink perform joins of two *skewed *datasets. Pig
>> <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark
>> <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the
>> join of skewed datasets. The record size that you are mentioning about in
>> your reply is after join operation takes place which is definitely going to
>> be huge enough not to fit in jvm task manager task slot in my use case. We
>> want to know if there is a way in flink to handle such skewed keys by
>> distributing their values across different jvms. Let me know if you need
>> more clarity on the issue.
>> Thanks,
>> Akshay
>>
>> On Thu, Nov 22, 2018 at 2:38 PM zhijiang 
>> wrote:
>>
>>> Hi Akshay,
>>>
>>> You encountered an existing issue for serializing large records to cause
>>> OOM.
>>>
>>> Every subpartition would create a separate serializer before, and each
>>> serializer would maintain an internal bytes array for storing intermediate
>>> serialization results. The key point is that these overhead internal bytes
>>> array are not managed by framework, and their size would exceed with the
>>> record size dynamically. If your job has many subpartitions with large
>>> records, it may probably cause OOM issue.
>>>
>>> I already improved this issue to some extent by sharing only one
>>> serializer for all subpartitions [1], that means we only have one bytes
>>> array overhead at most. This issue is covered in release-1.7.
>>> Currently the best option may reduce your record size if possible or you
>>> can increase the heap size of task manager container.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9913
>>>
>>> Best,
>>> Zhijiang
>>>
>>> --
>>> 发件人:Akshay Mendole 
>>> 发送时间:2018年11月22日(星期四) 13:43
>>> 收件人:user 
>>> 主 题:OutOfMemoryError while doing join operation in flink
>>>
>>> Hi,
>>> We are converting one of our pig pipelines to flink using apache
>>> beam. The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
>>> enriches them, joins them and dumps back to hdfs. The data set R1 is
>>> skewed. In a sense, it has few keys with lot of records. When we converted
>>> the pig pipeline to apache beam and ran it using flink on a production yarn
>>> cluster, we got the following error
>>>
>>> 2018-11-21 16:52:25,307 ERROR
>>> org.apache.flink.runtime.operators.BatchTask  - Error in
>>> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
>>> java.lang.RuntimeException: Emitting the record caused an I/O exception:
>>> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
>>> JVM heap space
>>> at
>>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>>> at
>>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>> at
>>> org.ap

Re: OutOfMemoryError while doing join operation in flink

2018-11-22 Thread Till Rohrmann
Hi Akshay,

Flink currently does not support to automatically distribute hot keys
across different JVMs. What you can do is to adapt the parallelism/number
of partitions manually if you encounter that one partition contains a lot
of hot keys. This might mitigate the problem by partitioning the hot keys
into different partitions.

Apart from that, the problem seems to be as Zhijiang indicated that your
join result is quite large. One record is 1 GB large. Try to decrease it or
give more memory to your TMs.

Cheers,
Till

On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole 
wrote:

> Hi Zhijiang,
>  Thanks for the quick reply. My concern is more towards
> how flink perform joins of two *skewed *datasets. Pig
> <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark
> <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the join
> of skewed datasets. The record size that you are mentioning about in your
> reply is after join operation takes place which is definitely going to be
> huge enough not to fit in jvm task manager task slot in my use case. We
> want to know if there is a way in flink to handle such skewed keys by
> distributing their values across different jvms. Let me know if you need
> more clarity on the issue.
> Thanks,
> Akshay
>
> On Thu, Nov 22, 2018 at 2:38 PM zhijiang 
> wrote:
>
>> Hi Akshay,
>>
>> You encountered an existing issue for serializing large records to cause
>> OOM.
>>
>> Every subpartition would create a separate serializer before, and each
>> serializer would maintain an internal bytes array for storing intermediate
>> serialization results. The key point is that these overhead internal bytes
>> array are not managed by framework, and their size would exceed with the
>> record size dynamically. If your job has many subpartitions with large
>> records, it may probably cause OOM issue.
>>
>> I already improved this issue to some extent by sharing only one
>> serializer for all subpartitions [1], that means we only have one bytes
>> array overhead at most. This issue is covered in release-1.7.
>> Currently the best option may reduce your record size if possible or you
>> can increase the heap size of task manager container.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9913
>>
>> Best,
>> Zhijiang
>>
>> --
>> 发件人:Akshay Mendole 
>> 发送时间:2018年11月22日(星期四) 13:43
>> 收件人:user 
>> 主 题:OutOfMemoryError while doing join operation in flink
>>
>> Hi,
>> We are converting one of our pig pipelines to flink using apache
>> beam. The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
>> enriches them, joins them and dumps back to hdfs. The data set R1 is
>> skewed. In a sense, it has few keys with lot of records. When we converted
>> the pig pipeline to apache beam and ran it using flink on a production yarn
>> cluster, we got the following error
>>
>> 2018-11-21 16:52:25,307 ERROR
>> org.apache.flink.runtime.operators.BatchTask  - Error in
>> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
>> java.lang.RuntimeException: Emitting the record caused an I/O exception:
>> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
>> JVM heap space
>> at
>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>> at
>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>> at
>> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
>> at
>> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
>> at
>> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
>> at
>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>> at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>> at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Failed to serialize element. Serialized
>> size (> 1136656562 bytes) exceeds JVM heap space
>> at
>> org.apache.flink.core.memory

Re: OutOfMemoryError while doing join operation in flink

2018-11-22 Thread Akshay Mendole
Hi Zhijiang,
 Thanks for the quick reply. My concern is more towards how
flink perform joins of two *skewed *datasets. Pig
<https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark
<https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the join
of skewed datasets. The record size that you are mentioning about in your
reply is after join operation takes place which is definitely going to be
huge enough not to fit in jvm task manager task slot in my use case. We
want to know if there is a way in flink to handle such skewed keys by
distributing their values across different jvms. Let me know if you need
more clarity on the issue.
Thanks,
Akshay

On Thu, Nov 22, 2018 at 2:38 PM zhijiang  wrote:

> Hi Akshay,
>
> You encountered an existing issue for serializing large records to cause
> OOM.
>
> Every subpartition would create a separate serializer before, and each
> serializer would maintain an internal bytes array for storing intermediate
> serialization results. The key point is that these overhead internal bytes
> array are not managed by framework, and their size would exceed with the
> record size dynamically. If your job has many subpartitions with large
> records, it may probably cause OOM issue.
>
> I already improved this issue to some extent by sharing only one
> serializer for all subpartitions [1], that means we only have one bytes
> array overhead at most. This issue is covered in release-1.7.
> Currently the best option may reduce your record size if possible or you
> can increase the heap size of task manager container.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9913
>
> Best,
> Zhijiang
>
> --
> 发件人:Akshay Mendole 
> 发送时间:2018年11月22日(星期四) 13:43
> 收件人:user 
> 主 题:OutOfMemoryError while doing join operation in flink
>
> Hi,
> We are converting one of our pig pipelines to flink using apache beam.
> The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
> enriches them, joins them and dumps back to hdfs. The data set R1 is
> skewed. In a sense, it has few keys with lot of records. When we converted
> the pig pipeline to apache beam and ran it using flink on a production yarn
> cluster, we got the following error
>
> 2018-11-21 16:52:25,307 ERROR
> org.apache.flink.runtime.operators.BatchTask  - Error in
> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
> java.lang.RuntimeException: Emitting the record caused an I/O exception:
> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
> JVM heap space
> at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
> at
> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
> at
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to serialize element. Serialized
> size (> 1136656562 bytes) exceeds JVM heap space
> at
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
> at
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
> at
> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
> at java.io.DataOutputStream.write(DataOutputStream.java:107)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at
> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
> at
> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>  

回复:OutOfMemoryError while doing join operation in flink

2018-11-22 Thread zhijiang
Hi Akshay,

You encountered an existing issue for serializing large records to cause OOM.

Every subpartition would create a separate serializer before, and each 
serializer would maintain an internal bytes array for storing intermediate 
serialization results. The key point is that these overhead internal bytes 
array are not managed by framework, and their size would exceed with the record 
size dynamically. If your job has many subpartitions with large records, it may 
probably cause OOM issue.

I already improved this issue to some extent by sharing only one serializer for 
all subpartitions [1], that means we only have one bytes array overhead at 
most. This issue is covered in release-1.7.
Currently the best option may reduce your record size if possible or you can 
increase the heap size of task manager container.

[1] https://issues.apache.org/jira/browse/FLINK-9913

Best,
Zhijiang
--
发件人:Akshay Mendole 
发送时间:2018年11月22日(星期四) 13:43
收件人:user 
主 题:OutOfMemoryError while doing join operation in flink

Hi,
We are converting one of our pig pipelines to flink using apache beam. The 
pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches them, 
joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it 
has few keys with lot of records. When we converted the pig pipeline to apache 
beam and ran it using flink on a production yarn cluster, we got the following 
error 

2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask  
- Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) 
(25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed 
to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap 
space
at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at 
org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at 
org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at 
org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 
1136656562 bytes) exceeds JVM heap space
at 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at 
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at 
java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at 
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCod

OutOfMemoryError while doing join operation in flink

2018-11-21 Thread Akshay Mendole
Hi,
We are converting one of our pig pipelines to flink using apache beam.
The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
enriches them, joins them and dumps back to hdfs. The data set R1 is
skewed. In a sense, it has few keys with lot of records. When we converted
the pig pipeline to apache beam and ran it using flink on a production yarn
cluster, we got the following error

2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask
- Error in task code:  GroupReduce (GroupReduce at
CoGBK/GBK) (25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception:
Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
JVM heap space
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at
org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at
org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at
org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized
size (> 1136656562 bytes) exceeds JVM heap space
at
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at
java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputS

Re: After OutOfMemoryError State can not be readed

2018-09-07 Thread Edward Alexander Rojas Clavijo
Hi Stefan, Vino,
Thanks for your answers.

We are using full checkpointing, not incremental. We are using custom
serializers for the operators state classes, The serializers perform
encryption before writing and decrypt when reading. The serializer is
stateless.
We register the Serializers by using
env.getConfig()
  .registerTypeWithKryoSerializer(ProcessState.class,
ProcessStateSerializer.class);

In normal cases the Serialization works correctly, even after recovering
from a failure. We get this error only when taskmnager fails due to memory
problems.

Thanks again for your help,
Edward

El vie., 7 sept. 2018 a las 11:51, Stefan Richter (<
s.rich...@data-artisans.com>) escribió:

> Hi,
>
> what I can say is that any failures like OOMs should not corrupt
> checkpoint files, because only successfully completed checkpoints are used
> for recovery by the job manager. Just to get a bit more info, are you using
> full or incremental checkpoints? Unfortunately, it is a bit hard to say
> from the given information what the cause of the problem is. Typically,
> these problems have been observed when something was wrong with a
> serializer or a stateful serializer was used from multiple threads.
>
> Best,
> Stefan
>
> Am 07.09.2018 um 05:04 schrieb vino yang :
>
> Hi Edward,
>
> From this log: Caused by: java.io.EOFException, it seems that the state
> metadata file has been corrupted.
> But I can't confirm it, maybe Stefan knows more details, Ping him for you.
>
> Thanks, vino.
>
> Edward Rojas  于2018年9月7日周五 上午1:22写道:
>
>> Hello all,
>>
>> We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend.
>> When performing some load testing we got an /OutOfMemoryError: native
>> memory
>> exhausted/, causing the job to fail and be restarted.
>>
>> After the Taskmanager is restarted, the job is recovered from a
>> Checkpoint,
>> but it seems that there is a problem when trying to access the state. We
>> got
>> the error from the *onTimer* function of a *onProcessingTime*.
>>
>> It would be possible that the OOM error could have caused to checkpoint a
>> corrupted state?
>>
>> We get Exceptions like:
>>
>> TimerException{java.lang.RuntimeException: Error while retrieving data
>> from
>> RocksDB.}
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:277)
>> at
>>
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
>> at
>>
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>> at java.lang.Thread.run(Thread.java:811)
>> Caused by: java.lang.RuntimeException: Error while retrieving data from
>> RocksDB.
>> at
>>
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
>> at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
>> at
>>
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
>> at
>>
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
>> at
>>
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>> ... 7 more
>> Caused by: java.io.EOFException
>> at java.io.DataInputStream.readFully(DataInputStream.java:208)
>> at java.io.DataInputStream.readUTF(DataInputStream.java:618)
>> at java.io.DataInputStream.readUTF(DataInputStream.java:573)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
>> at
>>
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
>> ... 12 more
>>
>>
>> Thanks in advance for any help
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>

-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*


Re: After OutOfMemoryError State can not be readed

2018-09-07 Thread Stefan Richter
Hi,

what I can say is that any failures like OOMs should not corrupt checkpoint 
files, because only successfully completed checkpoints are used for recovery by 
the job manager. Just to get a bit more info, are you using full or incremental 
checkpoints? Unfortunately, it is a bit hard to say from the given information 
what the cause of the problem is. Typically, these problems have been observed 
when something was wrong with a serializer or a stateful serializer was used 
from multiple threads.

Best,
Stefan 

> Am 07.09.2018 um 05:04 schrieb vino yang :
> 
> Hi Edward,
> 
> From this log: Caused by: java.io.EOFException, it seems that the state 
> metadata file has been corrupted.
> But I can't confirm it, maybe Stefan knows more details, Ping him for you.
> 
> Thanks, vino.
> 
> Edward Rojas mailto:edward.roja...@gmail.com>> 
> 于2018年9月7日周五 上午1:22写道:
> Hello all,
> 
> We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend. 
> When performing some load testing we got an /OutOfMemoryError: native memory
> exhausted/, causing the job to fail and be restarted.
> 
> After the Taskmanager is restarted, the job is recovered from a Checkpoint,
> but it seems that there is a problem when trying to access the state. We got
> the error from the *onTimer* function of a *onProcessingTime*.
> 
> It would be possible that the OOM error could have caused to checkpoint a
> corrupted state?
> 
> We get Exceptions like:
> 
> TimerException{java.lang.RuntimeException: Error while retrieving data from
> RocksDB.}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
> at java.util.concurrent.FutureTask.run(FutureTask.java:277)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.lang.Thread.run(Thread.java:811)
> Caused by: java.lang.RuntimeException: Error while retrieving data from
> RocksDB.
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
> at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
> ... 7 more
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readFully(DataInputStream.java:208)
> at java.io.DataInputStream.readUTF(DataInputStream.java:618)
> at java.io.DataInputStream.readUTF(DataInputStream.java:573)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
> ... 12 more
> 
> 
> Thanks in advance for any help
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>



Re: After OutOfMemoryError State can not be readed

2018-09-06 Thread vino yang
Hi Edward,

>From this log: Caused by: java.io.EOFException, it seems that the state
metadata file has been corrupted.
But I can't confirm it, maybe Stefan knows more details, Ping him for you.

Thanks, vino.

Edward Rojas  于2018年9月7日周五 上午1:22写道:

> Hello all,
>
> We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend.
> When performing some load testing we got an /OutOfMemoryError: native
> memory
> exhausted/, causing the job to fail and be restarted.
>
> After the Taskmanager is restarted, the job is recovered from a Checkpoint,
> but it seems that there is a problem when trying to access the state. We
> got
> the error from the *onTimer* function of a *onProcessingTime*.
>
> It would be possible that the OOM error could have caused to checkpoint a
> corrupted state?
>
> We get Exceptions like:
>
> TimerException{java.lang.RuntimeException: Error while retrieving data from
> RocksDB.}
> at
>
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
> at java.util.concurrent.FutureTask.run(FutureTask.java:277)
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.lang.Thread.run(Thread.java:811)
> Caused by: java.lang.RuntimeException: Error while retrieving data from
> RocksDB.
> at
>
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
> at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
> at
>
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
> at
>
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
> at
>
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
> at
>
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
> ... 7 more
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readFully(DataInputStream.java:208)
> at java.io.DataInputStream.readUTF(DataInputStream.java:618)
> at java.io.DataInputStream.readUTF(DataInputStream.java:573)
> at
>
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
> at
>
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
> ... 12 more
>
>
> Thanks in advance for any help
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


After OutOfMemoryError State can not be readed

2018-09-06 Thread Edward Rojas
Hello all,

We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend. 
When performing some load testing we got an /OutOfMemoryError: native memory
exhausted/, causing the job to fail and be restarted.

After the Taskmanager is restarted, the job is recovered from a Checkpoint,
but it seems that there is a problem when trying to access the state. We got
the error from the *onTimer* function of a *onProcessingTime*.

It would be possible that the OOM error could have caused to checkpoint a
corrupted state?

We get Exceptions like:

TimerException{java.lang.RuntimeException: Error while retrieving data from
RocksDB.}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
at java.util.concurrent.FutureTask.run(FutureTask.java:277)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.lang.Thread.run(Thread.java:811)
Caused by: java.lang.RuntimeException: Error while retrieving data from
RocksDB.
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
... 7 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:208)
at java.io.DataInputStream.readUTF(DataInputStream.java:618)
at java.io.DataInputStream.readUTF(DataInputStream.java:573)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
... 12 more


Thanks in advance for any help




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink 1.6 ExecutionJobVertex.getTaskInformationOrBlobKey OutOfMemoryError

2018-08-13 Thread 杨力
Thanks for the tip! It works.

I forgot the job manager.

Hequn Cheng  于 2018年8月14日周二 上午9:15写道:

> Hi,
>
> Have you ever increased the memory of job master?
> If you run a flink job on yarn, you can increase job master's memory by
> "-yjm 1024m"[1].
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#run-a-flink-job-on-yarn
>
> On Mon, Aug 13, 2018 at 10:25 PM, 杨力  wrote:
>
>> I used to runFlink SQL in streaming mode with more than 70 sqls in
>> version 1.4. With so many sqls loaded, akka.framesize has to be set to 200
>> MB to submit the job.
>>
>> When I am trying to run the job with flink 1.6.0, the HTTP-based job
>> submission works perfectly but an OutOfMemoryError is thrown when tasks are
>> being depolyed.
>>
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:3236)
>> at
>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
>> at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
>> at
>> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
>> at
>> org.apache.flink.runtime.blob.BlobWriter.serializeAndTryOffload(BlobWriter.java:99)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getTaskInformationOrBlobKey(ExecutionJobVertex.java:393)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:827)
>> at
>> org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:580)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$2(ExecutionGraph.java:963)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph$$Lambda$105/800937955.accept(Unknown
>> Source)
>> at
>> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
>> at
>> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:541)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture$$Lambda$92/1432873073.accept(Unknown
>> Source)
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
>> at akka.dispatch.OnComplete.internal(Future.scala:259)
>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>
>> This OOM error raises even with a 12GB heap. I have dived into source
>> code, only found that ExecutionJobVertex.getTaskInformationOrBlobKey is
>> serializing a TaskInformation object, which seems not to be a large one.
>> Can anyone help me to fix or work around the problem?
>>
>
>


Re: Flink 1.6 ExecutionJobVertex.getTaskInformationOrBlobKey OutOfMemoryError

2018-08-13 Thread Hequn Cheng
Hi,

Have you ever increased the memory of job master?
If you run a flink job on yarn, you can increase job master's memory by
"-yjm 1024m"[1].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#run-a-flink-job-on-yarn

On Mon, Aug 13, 2018 at 10:25 PM, 杨力  wrote:

> I used to runFlink SQL in streaming mode with more than 70 sqls in version
> 1.4. With so many sqls loaded, akka.framesize has to be set to 200 MB to
> submit the job.
>
> When I am trying to run the job with flink 1.6.0, the HTTP-based job
> submission works perfectly but an OutOfMemoryError is thrown when tasks are
> being depolyed.
>
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.
> java:118)
> at java.io.ByteArrayOutputStream.ensureCapacity(
> ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.
> java:153)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(
> ObjectOutputStream.java:1877)
> at java.io.ObjectOutputStream$BlockDataOutputStream.
> setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
> at org.apache.flink.util.InstantiationUtil.serializeObject(
> InstantiationUtil.java:512)
> at org.apache.flink.util.SerializedValue.(
> SerializedValue.java:52)
> at org.apache.flink.runtime.blob.BlobWriter.
> serializeAndTryOffload(BlobWriter.java:99)
> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.
> getTaskInformationOrBlobKey(ExecutionJobVertex.java:393)
> at org.apache.flink.runtime.executiongraph.ExecutionVertex.
> createDeploymentDescriptor(ExecutionVertex.java:827)
> at org.apache.flink.runtime.executiongraph.Execution.
> deploy(Execution.java:580)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> lambda$scheduleEager$2(ExecutionGraph.java:963)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph$
> $Lambda$105/800937955.accept(Unknown Source)
> at java.util.concurrent.CompletableFuture.uniAccept(
> CompletableFuture.java:656)
> at java.util.concurrent.CompletableFuture$UniAccept.
> tryFire(CompletableFuture.java:632)
> at java.util.concurrent.CompletableFuture.postComplete(
> CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.complete(
> CompletableFuture.java:1962)
> at org.apache.flink.runtime.concurrent.FutureUtils$
> ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:541)
> at org.apache.flink.runtime.concurrent.FutureUtils$
> ResultConjunctFuture$$Lambda$92/1432873073.accept(Unknown Source)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(
> CompletableFuture.java:760)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(
> CompletableFuture.java:736)
> at java.util.concurrent.CompletableFuture.postComplete(
> CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.complete(
> CompletableFuture.java:1962)
> at org.apache.flink.runtime.concurrent.FutureUtils$1.
> onComplete(FutureUtils.java:772)
> at akka.dispatch.OnComplete.internal(Future.scala:259)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>
> This OOM error raises even with a 12GB heap. I have dived into source
> code, only found that ExecutionJobVertex.getTaskInformationOrBlobKey is
> serializing a TaskInformation object, which seems not to be a large one.
> Can anyone help me to fix or work around the problem?
>


Flink 1.6 ExecutionJobVertex.getTaskInformationOrBlobKey OutOfMemoryError

2018-08-13 Thread 杨力
I used to runFlink SQL in streaming mode with more than 70 sqls in version
1.4. With so many sqls loaded, akka.framesize has to be set to 200 MB to
submit the job.

When I am trying to run the job with flink 1.6.0, the HTTP-based job
submission works perfectly but an OutOfMemoryError is thrown when tasks are
being depolyed.

java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
at
org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
at
org.apache.flink.runtime.blob.BlobWriter.serializeAndTryOffload(BlobWriter.java:99)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getTaskInformationOrBlobKey(ExecutionJobVertex.java:393)
at
org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:827)
at
org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:580)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$2(ExecutionGraph.java:963)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph$$Lambda$105/800937955.accept(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:541)
at
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture$$Lambda$92/1432873073.accept(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:259)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

This OOM error raises even with a 12GB heap. I have dived into source code,
only found that ExecutionJobVertex.getTaskInformationOrBlobKey is
serializing a TaskInformation object, which seems not to be a large one.
Can anyone help me to fix or work around the problem?


Re: OutOfMemoryError

2016-08-08 Thread Paulo Cezar
Thanks Stephan, I had a MapFunction using Unirest and that was the origin
of the leak.

On Tue, Aug 2, 2016 at 7:36 AM, Stephan Ewen  wrote:

> My guess would be that you have a thread leak in the user code.
> More memory will not solve the problem, only push it a bit further away.
>
> On Mon, Aug 1, 2016 at 9:15 PM, Paulo Cezar  wrote:
>
>> Hi folks,
>>
>>
>> I'm trying to run a DataSet program but after around 200k records are 
>> processed a "java.lang.OutOfMemoryError: unable to create new native thread" 
>> stops me.
>>
>>
>> I'm deploying Flink (via bin/yarn-session.sh) on a YARN cluster with 10 
>> nodes (each with 8 cores) and starting 10 task managers, each with 8 slots 
>> and 6GB of RAM.
>>
>>
>> Except for the data sink that writes to HDFS and runs with a parallelism of 
>> 1, my job runs with a parallelism of 80 and has two input datasets, each is 
>> a HDFS file with around 6GB and 20mi lines. Most of my map functions uses 
>> external services via RPC or REST APIs to enrich the raw data with info from 
>> other sources.
>>
>> Might I be doing something wrong or I really should have more memory 
>> available?
>>
>> Thanks,
>> Paulo Cezar
>>
>>
>


Re: OutOfMemoryError

2016-08-02 Thread Stephan Ewen
My guess would be that you have a thread leak in the user code.
More memory will not solve the problem, only push it a bit further away.

On Mon, Aug 1, 2016 at 9:15 PM, Paulo Cezar  wrote:

> Hi folks,
>
>
> I'm trying to run a DataSet program but after around 200k records are 
> processed a "java.lang.OutOfMemoryError: unable to create new native thread" 
> stops me.
>
>
> I'm deploying Flink (via bin/yarn-session.sh) on a YARN cluster with 10 nodes 
> (each with 8 cores) and starting 10 task managers, each with 8 slots and 6GB 
> of RAM.
>
>
> Except for the data sink that writes to HDFS and runs with a parallelism of 
> 1, my job runs with a parallelism of 80 and has two input datasets, each is a 
> HDFS file with around 6GB and 20mi lines. Most of my map functions uses 
> external services via RPC or REST APIs to enrich the raw data with info from 
> other sources.
>
> Might I be doing something wrong or I really should have more memory 
> available?
>
> Thanks,
> Paulo Cezar
>
>


OutOfMemoryError

2016-08-01 Thread Paulo Cezar
Hi folks,


I'm trying to run a DataSet program but after around 200k records are
processed a "java.lang.OutOfMemoryError: unable to create new native
thread" stops me.


I'm deploying Flink (via bin/yarn-session.sh) on a YARN cluster with
10 nodes (each with 8 cores) and starting 10 task managers, each with
8 slots and 6GB of RAM.


Except for the data sink that writes to HDFS and runs with a
parallelism of 1, my job runs with a parallelism of 80 and has two
input datasets, each is a HDFS file with around 6GB and 20mi lines.
Most of my map functions uses external services via RPC or REST APIs
to enrich the raw data with info from other sources.

Might I be doing something wrong or I really should have more memory available?

Thanks,
Paulo Cezar


Re: OutofMemoryError: Java heap space & Loss of Taskmanager

2016-03-18 Thread Ufuk Celebi
Are you facing these issues with the batch or streaming programs?

– Ufuk

On Wed, Mar 16, 2016 at 4:30 PM, Till Rohrmann  wrote:
> If the problem is that your JVMs stall too long, then you can also increase
> the akka.ask.timeout configuration value in flink-config.yaml. That will
> also increase the timeout for the failure detector. Or you set the values
> for Akka’s deathwatch yourself to some greater values. See the configuration
> docs [1] for a list of available options.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#distributed-coordination-via-akka
>
> Cheers,
> Till
>
>
> On Wed, Mar 16, 2016 at 4:25 PM, Ravinder Kaur  wrote:
>>
>> Hello All,
>>
>> I have been facing the Loss of TaskManager issues again. But the
>> JobManager and TaskManager logs showed that either of them went unreachable
>> while the job ran with the following error.
>>
>> akka.remote.RemoteWatcher: Detected unreachable
>>
>> Much speculation brought me to
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JobManager-is-no-longer-reachable-td1821.html
>>
>> Following the suggestions by Stephan, only after using the G1 garbage
>> collector the Job stopped failing. But after using G1 the runtime has
>> increased by 50%. How can this be explained?
>>
>> Kind Regards,
>> Ravinder Kaur
>>
>>
>>
>> On Tue, Mar 15, 2016 at 11:20 PM, Ravinder Kaur 
>> wrote:
>>>
>>> Hello All,
>>>
>>> I figured out that it was not a problem with GC stalls but improper
>>> memory management. I increased the taskmanager.memory.fraction from 0.5 ->
>>> 0.6.
>>>
>>> This solved the errors and I could run the jobs successfully.
>>>
>>> Kind Regards,
>>> Ravinder.
>>>
>>> On Tue, Mar 15, 2016 at 7:21 PM, Ravinder Kaur 
>>> wrote:

 Hi Till,

 After running a few jobs, the Taskmanagers are lost again.

 03/15/2016 18:41:45 Source: Read Text File Source -> Flat Map(8/25)
 switched to FINISHED
 03/15/2016 18:43:27 Keyed Aggregation -> Sink: Unnamed(3/25)
 switched to FAILED
 java.lang.Exception: The slot in which the task was executed has been
 released. Probably loss of TaskManager 50d28775a642f8f19834beb607f01035 @
 vm-10-155-208-138 - 4 slots - URL:
 akka.tcp://flink@10.155.208.138:32846/user/taskmanager
 at
 org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153)
 at
 org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
 at
 org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
 at
 org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)
 at
 org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
 at
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:696)
 at
 scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
 at
 org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
 at
 scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
 at
 org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
 at
 org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
 at
 scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
 at
 org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at
 org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at
 akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
 at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
 at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
 at akka.actor.ActorCell.invoke(ActorCell.scala:486)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 03/15/2016 18:43:27 Job execution switched to status FAILING.
 03/15/2016 18:43:27 Source: Read Text File Source -> Flat Map(2/25)
 

Re: OutofMemoryError: Java heap space & Loss of Taskmanager

2016-03-15 Thread Till Rohrmann
Hi Ravinder,

the log of the TM you've sent is the log of the only TM which has not been
disassociated from the JM. Can it be that you simply stopped the cluster
which results in the disassociation events?

Normally, Flink should kill all processes. If you have some processes
lingering around, then you should kill them first.

The more memory you provide the more data can be kept in memory. Whenever
the managed memory is full, then it will be spilled to disk. That's how you
can also process data which does not fit completely into memory. However,
all elements which are given to a user function will be kept on the heap
space. If it now happens that your elements become too big or you keep too
many elements on the heap, you'll see an OOM exception. Then it helps if
you increase the assigned memory or lower the memory fraction.

Cheers,
Till

On Tue, Mar 15, 2016 at 11:17 AM, Ravinder Kaur  wrote:

> Hi Till,
>
> Log of JobManager
>
> 09:55:31,574 WARN  org.apache.hadoop.util.NativeCodeLoader
>   - Unable to load native-hadoop library for your platform... using
> builtin-java classes where applicable
> 09:55:31,742 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-
> 
> 09:55:31,742 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-  Starting JobManager (Version: 0.10.1, Rev:2e9b231,
> Date:22.11.2015 @ 12:41:12 CET)
> 09:55:31,742 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-  Current user: flink
> 09:55:31,742 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.7/24.95-b01
> 09:55:31,743 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-  Maximum heap size: 246 MiBytes
> 09:55:31,743 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-  JAVA_HOME: /usr/lib/jvm/java-1.7.0-openjdk-amd64
> 09:55:31,745 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-  Hadoop version: 2.7.0
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-  JVM Options:
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- -Xms256m
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- -Xmx256m
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- -XX:MaxPermSize=256m
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-
> -Dlog.file=/home/flink/flink-0.10.1/log/flink-flink-jobmanager-0-vm-10-155-208-156.cloud.mwn.de.log
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-
> -Dlog4j.configuration=file:/home/flink/flink-0.10.1/conf/log4j.properties
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-
> -Dlogback.configurationFile=file:/home/flink/flink-0.10.1/conf/logback.xml
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-  Program Arguments:
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- --configDir
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- /home/flink/flink-0.10.1/conf
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- --executionMode
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- cluster
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- --streamingMode
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- streaming
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-  Classpath:
> /home/flink/flink-0.10.1/lib/flink-dist_2.11-0.10.1.jar:/home/flink/flink-0.10.1/lib/flink-python_2.11-0.10.1.jar:/home/flink/flink-0.10.1/lib/log4j-1.2.17.jar:/home/flink/flink-0.10.1/lib/slf4j-log4j12-1.7.7.jar:::
> 09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
>-
> 
> 09:55:31,924 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- Loading configuration from /home/flink/flink-0.10.1/conf
> 09:55:31,941 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- Staring JobManager without high-availability
> 09:55:31,950 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- Starting JobManager on 10.155.208.156:6123 with execution mode
> CLUSTER and streaming mode STREAMING
> 09:55:32,039 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- Security is not enabled. Starting non-authenticated JobManager.
> 09:55:32,039 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- Starting JobManager
> 09:55:32,040 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- Starting JobManager actor system at 10.155.208.156:6123
> 09:55:32,483 INFO  akka.event.slf4j.Slf4jLogger
>- Slf4jLogger started
> 09:55:32,564 INFO  Remoting
> 

Re: OutofMemoryError: Java heap space & Loss of Taskmanager

2016-03-15 Thread Ravinder Kaur
Hi Till,

Log of JobManager

09:55:31,574 WARN  org.apache.hadoop.util.NativeCodeLoader
  - Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
09:55:31,742 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -

09:55:31,742 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -  Starting JobManager (Version: 0.10.1, Rev:2e9b231, Date:22.11.2015
@ 12:41:12 CET)
09:55:31,742 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -  Current user: flink
09:55:31,742 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.7/24.95-b01
09:55:31,743 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -  Maximum heap size: 246 MiBytes
09:55:31,743 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -  JAVA_HOME: /usr/lib/jvm/java-1.7.0-openjdk-amd64
09:55:31,745 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -  Hadoop version: 2.7.0
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -  JVM Options:
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - -Xms256m
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - -Xmx256m
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - -XX:MaxPermSize=256m
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -
-Dlog.file=/home/flink/flink-0.10.1/log/flink-flink-jobmanager-0-vm-10-155-208-156.cloud.mwn.de.log
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -
-Dlog4j.configuration=file:/home/flink/flink-0.10.1/conf/log4j.properties
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -
-Dlogback.configurationFile=file:/home/flink/flink-0.10.1/conf/logback.xml
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -  Program Arguments:
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - --configDir
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - /home/flink/flink-0.10.1/conf
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - --executionMode
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - cluster
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - --streamingMode
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - streaming
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -  Classpath:
/home/flink/flink-0.10.1/lib/flink-dist_2.11-0.10.1.jar:/home/flink/flink-0.10.1/lib/flink-python_2.11-0.10.1.jar:/home/flink/flink-0.10.1/lib/log4j-1.2.17.jar:/home/flink/flink-0.10.1/lib/slf4j-log4j12-1.7.7.jar:::
09:55:31,746 INFO  org.apache.flink.runtime.jobmanager.JobManager
 -

09:55:31,924 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Loading configuration from /home/flink/flink-0.10.1/conf
09:55:31,941 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Staring JobManager without high-availability
09:55:31,950 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Starting JobManager on 10.155.208.156:6123 with execution mode
CLUSTER and streaming mode STREAMING
09:55:32,039 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Security is not enabled. Starting non-authenticated JobManager.
09:55:32,039 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Starting JobManager
09:55:32,040 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Starting JobManager actor system at 10.155.208.156:6123
09:55:32,483 INFO  akka.event.slf4j.Slf4jLogger
 - Slf4jLogger started
09:55:32,564 INFO  Remoting
 - Starting remoting
09:55:32,730 INFO  Remoting
 - Remoting started; listening on addresses :[akka.tcp://
flink@10.155.208.156:6123]
09:55:32,731 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Starting JobManger web frontend
09:55:32,761 INFO  org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
  - Using directory /tmp/flink-web-6cd96e7e-62be-4301-9376-c98528bd58b8
for the web interface files
09:55:32,762 INFO  org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
  - Serving job manager log from
/home/flink/flink-0.10.1/log/flink-flink-jobmanager-0-vm-10-155-208-156.cloud.mwn.de.log
09:55:32,762 INFO  org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
  - Serving job manager stdout from
/home/flink/flink-0.10.1/log/flink-flink-jobmanager-0-vm-10-155-208-156.cloud.mwn.de.out
09:55:33,040 INFO  org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
  - Web frontend listening at 0:0:0:0:0:0:0:0:8081
09:55:33,041 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Starting JobManager actor
09:55:33,046 INFO  org.apache.flink.runtime.blob.BlobServer
 - Cr

Re: OutofMemoryError: Java heap space & Loss of Taskmanager

2016-03-15 Thread Till Rohrmann
Hi Ravinder,

this should not be the relevant log extract. The log says that the TM is
started on port 49653 and the JM log says that the TM on port 4 is
lost. Would you mind to share the complete JM and TM logs with us?

Cheers,
Till

On Tue, Mar 15, 2016 at 10:54 AM, Ravinder Kaur  wrote:

> Hello Ufuk,
>
> Yes, the same WordCount program is being run.
>
> Kind Regards,
> Ravinder Kaur
>
> On Tue, Mar 15, 2016 at 10:45 AM, Ufuk Celebi  wrote:
>
>> What do you mean with iteration in this context? Are you repeatedly
>> running the same WordCount program for streaming and batch
>> respectively?
>>
>> – Ufuk
>>
>> On Tue, Mar 15, 2016 at 10:22 AM, Till Rohrmann 
>> wrote:
>> > Hi Ravinder,
>> >
>> > could you tell us what's written in the taskmanager log of the failing
>> > taskmanager? There should be some kind of failure why the taskmanager
>> > stopped working.
>> >
>> > Moreover, given that you have 64 GB of main memory, you could easily
>> give
>> > 50GB as heap memory to each taskmanager.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Tue, Mar 15, 2016 at 9:48 AM, Ravinder Kaur 
>> wrote:
>> >>
>> >> Hello All,
>> >>
>> >> I'm running a simple word count example using the quickstart package
>> from
>> >> the Flink(0.10.1), on an input dataset of 500MB. This dataset is a set
>> of
>> >> randomly generated words of length 8.
>> >>
>> >> Cluster Configuration:
>> >>
>> >> Number of machines: 7
>> >> Total cores : 25
>> >> Memory on each: 64GB
>> >>
>> >> I'm interested in the performance measure between Batch and Stream
>> modes
>> >> and so I'm running WordCount example with number of iteration (max 10)
>> on
>> >> datasets of sizes ranging between 100MB and 50GB consisting of random
>> words
>> >> of length 4 and 8.
>> >>
>> >> While I ran the experiments in Batch mode all iterations ran fine, but
>> now
>> >> I'm stuck in the Streaming mode at this
>> >>
>> >> Caused by: java.lang.OutOfMemoryError: Java heap space
>> >> at java.util.HashMap.resize(HashMap.java:580)
>> >> at java.util.HashMap.addEntry(HashMap.java:879)
>> >> at java.util.HashMap.put(HashMap.java:505)
>> >> at
>> >>
>> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> >> at java.lang.Thread.run(Thread.java:745)
>> >>
>> >> I investigated found 2 solutions. (1) Increasing the
>> taskmanager.heap.mb
>> >> and (2) Reducing the taskmanager.memory.fraction
>> >>
>> >> Therefore I set taskmanager.heap.mb: 1024 and
>> taskmanager.memory.fraction:
>> >> 0.5 (default 0.7)
>> >>
>> >> When I ran the example with this setting I loose taskmanagers one by
>> one
>> >> during the job execution with the following cause
>> >>
>> >> Caused by: java.lang.Exception: The slot in which the task was executed
>> >> has been released. Probably loss of TaskManager
>> >> 831a72dad6fbb533b193820f45bdc5bc @ vm-10-155-208-138 - 4 slots - URL:
>> >> akka.tcp://flink@10.155.208.138:4/user/taskmanager
>> >> at
>> >>
>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153)
>> >> at
>> >>
>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
>> >> at
>> >>
>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
>> >> at
>> >> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)
>> >> at
>> >>
>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
>> >> at
>> >>
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:696)
>> >> at
>> >>
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> >> at
>> >>
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>> >> at
>> >>
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> >> at
>> >>
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> >> at
>> >>
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> >> at
>> >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> >> at
>> >>
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> >> at 

Re: OutofMemoryError: Java heap space & Loss of Taskmanager

2016-03-15 Thread Ravinder Kaur
Hello Ufuk,

Yes, the same WordCount program is being run.

Kind Regards,
Ravinder Kaur

On Tue, Mar 15, 2016 at 10:45 AM, Ufuk Celebi  wrote:

> What do you mean with iteration in this context? Are you repeatedly
> running the same WordCount program for streaming and batch
> respectively?
>
> – Ufuk
>
> On Tue, Mar 15, 2016 at 10:22 AM, Till Rohrmann 
> wrote:
> > Hi Ravinder,
> >
> > could you tell us what's written in the taskmanager log of the failing
> > taskmanager? There should be some kind of failure why the taskmanager
> > stopped working.
> >
> > Moreover, given that you have 64 GB of main memory, you could easily give
> > 50GB as heap memory to each taskmanager.
> >
> > Cheers,
> > Till
> >
> > On Tue, Mar 15, 2016 at 9:48 AM, Ravinder Kaur 
> wrote:
> >>
> >> Hello All,
> >>
> >> I'm running a simple word count example using the quickstart package
> from
> >> the Flink(0.10.1), on an input dataset of 500MB. This dataset is a set
> of
> >> randomly generated words of length 8.
> >>
> >> Cluster Configuration:
> >>
> >> Number of machines: 7
> >> Total cores : 25
> >> Memory on each: 64GB
> >>
> >> I'm interested in the performance measure between Batch and Stream modes
> >> and so I'm running WordCount example with number of iteration (max 10)
> on
> >> datasets of sizes ranging between 100MB and 50GB consisting of random
> words
> >> of length 4 and 8.
> >>
> >> While I ran the experiments in Batch mode all iterations ran fine, but
> now
> >> I'm stuck in the Streaming mode at this
> >>
> >> Caused by: java.lang.OutOfMemoryError: Java heap space
> >> at java.util.HashMap.resize(HashMap.java:580)
> >> at java.util.HashMap.addEntry(HashMap.java:879)
> >> at java.util.HashMap.put(HashMap.java:505)
> >> at
> >>
> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
> >> at
> >>
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> I investigated found 2 solutions. (1) Increasing the taskmanager.heap.mb
> >> and (2) Reducing the taskmanager.memory.fraction
> >>
> >> Therefore I set taskmanager.heap.mb: 1024 and
> taskmanager.memory.fraction:
> >> 0.5 (default 0.7)
> >>
> >> When I ran the example with this setting I loose taskmanagers one by one
> >> during the job execution with the following cause
> >>
> >> Caused by: java.lang.Exception: The slot in which the task was executed
> >> has been released. Probably loss of TaskManager
> >> 831a72dad6fbb533b193820f45bdc5bc @ vm-10-155-208-138 - 4 slots - URL:
> >> akka.tcp://flink@10.155.208.138:4/user/taskmanager
> >> at
> >>
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153)
> >> at
> >>
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
> >> at
> >>
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
> >> at
> >> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)
> >> at
> >>
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
> >> at
> >>
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:696)
> >> at
> >>
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> >> at
> >>
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> >> at
> >>
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> >> at
> >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> >> at
> >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> >> at
> >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> >> at
> >>
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> >> at
> >>
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100)
> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >> at
> >>
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
> >> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
> >> at akka.actor.ActorCell.aut

Re: OutofMemoryError: Java heap space & Loss of Taskmanager

2016-03-15 Thread Ravinder Kaur
Hi Till,

Following is the log file of one of the taskmanagers

09:55:37,071 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils
 - Trying to select the network interface and address to use by
connecting to the leading JobManager.
09:55:37,072 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils
 - TaskManager will try to connect for 1 milliseconds before
falling back to heuristics
09:55:37,075 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Retrieved new target address /10.155.208.156:6123.
09:55:37,084 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - TaskManager will use hostname/address 'vm-10-155-208-138.cloud.mwn.de'
(10.155.208.138) for communication.
09:55:37,085 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Starting TaskManager in streaming mode STREAMING
09:55:37,085 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Starting TaskManager actor system at 10.155.208.138:0
09:55:37,531 INFO  akka.event.slf4j.Slf4jLogger
 - Slf4jLogger started
09:55:37,587 INFO  Remoting
 - Starting remoting
09:55:37,774 INFO  Remoting
 - Remoting started; listening on addresses :[akka.tcp://
flink@10.155.208.138:49653]
09:55:37,782 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Starting TaskManager actor
09:55:37,798 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig
  - NettyConfig [server address:
vm-10-155-208-138.cloud.mwn.de/10.155.208.138, server port: 32798, memory
segment size (bytes): 32768, transport type: NIO, number of server threads:
0 (use Netty's default), number of client threads: 0 (use Netty's default),
server connect backlog: 0 (use Netty's default), client connect timeout
(sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
09:55:37,803 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Messages between TaskManager and JobManager have a max timeout of
10 milliseconds
09:55:37,811 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Temporary file directory '/tmp': total 4 GB, usable 0 GB (0.00%
usable)
09:55:37,848 INFO
 org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
64 MB for network buffer pool (number of memory segments: 2048, bytes per
segment: 32768).
09:55:37,955 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Using 0.5 of the currently free heap space for Flink managed heap
memory (455 MB).
09:55:37,978 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager
 - I/O manager uses directory
/tmp/flink-io-3b11098e-a3ea-4a8a-8ea4-c3f1c5b13d6f for spill files.
09:55:37,986 INFO  org.apache.flink.runtime.filecache.FileCache
 - User file cache uses directory
/tmp/flink-dist-cache-516dd09a-1dfe-46eb-b50b-b6e24b6e9fad
09:55:38,146 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Starting TaskManager actor at akka://flink/user/taskmanager#56985599.
09:55:38,146 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - TaskManager data connection information:
vm-10-155-208-138.cloud.mwn.de (dataPort=32798)
09:55:38,147 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - TaskManager has 4 task slot(s).
09:55:38,148 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Memory usage stats: [HEAP: 100/990/990 MB, NON HEAP: 24/37/304 MB
(used/committed/max)]
09:55:38,151 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Trying to register at JobManager akka.tcp://
flink@10.155.208.156:6123/user/jobmanager (attempt 1, timeout: 500
milliseconds)
09:55:38,301 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Successful registration at JobManager (akka.tcp://
flink@10.155.208.156:6123/user/jobmanager), starting network stack and
library cache.
09:55:38,479 INFO  org.apache.flink.runtime.io.network.netty.NettyClient
  - Successful initialization (took 55 ms).
09:55:38,533 INFO  org.apache.flink.runtime.io.network.netty.NettyServer
  - Successful initialization (took 54 ms). Listening on SocketAddress /
10.155.208.138:32798.
09:55:38,534 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Determined BLOB server address to be /10.155.208.156:59504. Starting
BLOB cache.
09:55:38,536 INFO  org.apache.flink.runtime.blob.BlobCache
  - Created BLOB cache storage directory
/tmp/blobStore-8e88302d-3303-4c80-8613-f0be13911fb2
09:56:48,371 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager
 - I/O manager removed spill file directory
/tmp/flink-io-3b11098e-a3ea-4a8a-8ea4-c3f1c5b13d6f

Kind Regards,
Ravinder

On Tue, Mar 15, 2016 at 10:22 AM, Till Rohrmann 
wrote:

> Hi Ravinder,
>
> could you tell us what's written in the taskmanager log of the failing
> taskmanager? There should be some kind of failure why the taskmanager
> stopped working.
>
> Moreover, given that you have 64 GB of main memory, you could easily give
> 50GB as heap memory to each taskmanager.
>
> Cheers,
> Till
>
> On Tue, Mar 15, 2016 at 9:48 AM, Ravinder Kaur 
>

Re: OutofMemoryError: Java heap space & Loss of Taskmanager

2016-03-15 Thread Ufuk Celebi
What do you mean with iteration in this context? Are you repeatedly
running the same WordCount program for streaming and batch
respectively?

– Ufuk

On Tue, Mar 15, 2016 at 10:22 AM, Till Rohrmann  wrote:
> Hi Ravinder,
>
> could you tell us what's written in the taskmanager log of the failing
> taskmanager? There should be some kind of failure why the taskmanager
> stopped working.
>
> Moreover, given that you have 64 GB of main memory, you could easily give
> 50GB as heap memory to each taskmanager.
>
> Cheers,
> Till
>
> On Tue, Mar 15, 2016 at 9:48 AM, Ravinder Kaur  wrote:
>>
>> Hello All,
>>
>> I'm running a simple word count example using the quickstart package from
>> the Flink(0.10.1), on an input dataset of 500MB. This dataset is a set of
>> randomly generated words of length 8.
>>
>> Cluster Configuration:
>>
>> Number of machines: 7
>> Total cores : 25
>> Memory on each: 64GB
>>
>> I'm interested in the performance measure between Batch and Stream modes
>> and so I'm running WordCount example with number of iteration (max 10) on
>> datasets of sizes ranging between 100MB and 50GB consisting of random words
>> of length 4 and 8.
>>
>> While I ran the experiments in Batch mode all iterations ran fine, but now
>> I'm stuck in the Streaming mode at this
>>
>> Caused by: java.lang.OutOfMemoryError: Java heap space
>> at java.util.HashMap.resize(HashMap.java:580)
>> at java.util.HashMap.addEntry(HashMap.java:879)
>> at java.util.HashMap.put(HashMap.java:505)
>> at
>> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
>> at
>> org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I investigated found 2 solutions. (1) Increasing the taskmanager.heap.mb
>> and (2) Reducing the taskmanager.memory.fraction
>>
>> Therefore I set taskmanager.heap.mb: 1024 and taskmanager.memory.fraction:
>> 0.5 (default 0.7)
>>
>> When I ran the example with this setting I loose taskmanagers one by one
>> during the job execution with the following cause
>>
>> Caused by: java.lang.Exception: The slot in which the task was executed
>> has been released. Probably loss of TaskManager
>> 831a72dad6fbb533b193820f45bdc5bc @ vm-10-155-208-138 - 4 slots - URL:
>> akka.tcp://flink@10.155.208.138:4/user/taskmanager
>> at
>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153)
>> at
>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
>> at
>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
>> at
>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)
>> at
>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:696)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at
>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.co

Re: OutofMemoryError: Java heap space & Loss of Taskmanager

2016-03-15 Thread Till Rohrmann
Hi Ravinder,

could you tell us what's written in the taskmanager log of the failing
taskmanager? There should be some kind of failure why the taskmanager
stopped working.

Moreover, given that you have 64 GB of main memory, you could easily give
50GB as heap memory to each taskmanager.

Cheers,
Till

On Tue, Mar 15, 2016 at 9:48 AM, Ravinder Kaur  wrote:

> Hello All,
>
> I'm running a simple word count example using the quickstart package from
> the Flink(0.10.1), on an input dataset of 500MB. This dataset is a set of
> randomly generated words of length 8.
>
> Cluster Configuration:
>
> Number of machines: 7
> Total cores : 25
> Memory on each: 64GB
>
> I'm interested in the performance measure between Batch and Stream modes
> and so I'm running WordCount example with number of iteration (max 10) on
> datasets of sizes ranging between 100MB and 50GB consisting of random words
> of length 4 and 8.
>
> While I ran the experiments in Batch mode all iterations ran fine, but now
> I'm stuck in the Streaming mode at this
>
> Caused by: java.lang.OutOfMemoryError: Java heap space
> at java.util.HashMap.resize(HashMap.java:580)
> at java.util.HashMap.addEntry(HashMap.java:879)
> at java.util.HashMap.put(HashMap.java:505)
> at
> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
> at
> org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> I investigated found 2 solutions. (1) Increasing the taskmanager.heap.mb
> and (2) Reducing the taskmanager.memory.fraction
>
> Therefore I set taskmanager.heap.mb: 1024 and taskmanager.memory.fraction:
> 0.5 (default 0.7)
>
> When I ran the example with this setting I loose taskmanagers one by one
> during the job execution with the following cause
>
> Caused by: java.lang.Exception: The slot in which the task was executed
> has been released. Probably loss of TaskManager
> 831a72dad6fbb533b193820f45bdc5bc @ vm-10-155-208-138 - 4 slots - URL:
> akka.tcp://flink@10.155.208.138:4/user/taskmanager
> at
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153)
> at
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
> at
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
> at
> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)
> at
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:696)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
> at akka.actor.ActorCell.invoke(ActorCell.scala:486)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ... 2 more
>
>
> While I look at the results generated at each taskmanager, they are fine.
> The logs also don't show any causes for the the job to get cancelled.
>
>
> Could anyone kindly guide me here?
>
> Kind Regards,
> Ravinder Kaur.

OutofMemoryError: Java heap space & Loss of Taskmanager

2016-03-15 Thread Ravinder Kaur
Hello All,

I'm running a simple word count example using the quickstart package from
the Flink(0.10.1), on an input dataset of 500MB. This dataset is a set of
randomly generated words of length 8.

Cluster Configuration:

Number of machines: 7
Total cores : 25
Memory on each: 64GB

I'm interested in the performance measure between Batch and Stream modes
and so I'm running WordCount example with number of iteration (max 10) on
datasets of sizes ranging between 100MB and 50GB consisting of random words
of length 4 and 8.

While I ran the experiments in Batch mode all iterations ran fine, but now
I'm stuck in the Streaming mode at this

Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.HashMap.resize(HashMap.java:580)
at java.util.HashMap.addEntry(HashMap.java:879)
at java.util.HashMap.put(HashMap.java:505)
at
org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
at
org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

I investigated found 2 solutions. (1) Increasing the taskmanager.heap.mb
and (2) Reducing the taskmanager.memory.fraction

Therefore I set taskmanager.heap.mb: 1024 and taskmanager.memory.fraction:
0.5 (default 0.7)

When I ran the example with this setting I loose taskmanagers one by one
during the job execution with the following cause

Caused by: java.lang.Exception: The slot in which the task was executed has
been released. Probably loss of TaskManager
831a72dad6fbb533b193820f45bdc5bc @ vm-10-155-208-138 - 4 slots - URL:
akka.tcp://flink@10.155.208.138:4/user/taskmanager
at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153)
at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
at
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)
at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:696)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
at akka.actor.ActorCell.invoke(ActorCell.scala:486)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
... 2 more


While I look at the results generated at each taskmanager, they are fine.
The logs also don't show any causes for the the job to get cancelled.


Could anyone kindly guide me here?

Kind Regards,
Ravinder Kaur.


Re: OutOfMemoryError in netty local transport

2015-10-01 Thread Maximilian Michels
Great to hear :)

On Thu, Oct 1, 2015 at 11:21 AM, Robert Schmidtke
 wrote:
> I pulled the current master branch and rebuilt Flink completely anyway.
> Works like a charm.
>
> On Thu, Oct 1, 2015 at 11:11 AM, Maximilian Michels  wrote:
>>
>> By the way, you might have to use the "-U" flag to force Maven to
>> update its dependencies:  mvn -U clean install -DskipTests
>>
>> On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke
>>  wrote:
>> > Sweet! I'll pull it straight away. Thanks!
>> >
>> > On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels 
>> > wrote:
>> >>
>> >> Hi Robert,
>> >>
>> >> Just a quick update: The issue has been resolved in the latest Maven
>> >> 0.10-SNAPSHOT dependency.
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
>> >>  wrote:
>> >> > Hi Max,
>> >> >
>> >> > thanks for your quick reply. I found the relevant code and commented
>> >> > it
>> >> > out
>> >> > for testing, seems to be working. Happily waiting for the fix. Thanks
>> >> > again.
>> >> >
>> >> > Robert
>> >> >
>> >> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels 
>> >> > wrote:
>> >> >>
>> >> >> Hi Robert,
>> >> >>
>> >> >> This is a regression on the current master due to changes in the way
>> >> >> Flink calculates the memory and sets the maximum direct memory size.
>> >> >> We introduced these changes when we merged support for off-heap
>> >> >> memory. This is not a problem in the way Flink deals with managed
>> >> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> >> >> maximum direct memory is only used by the network stack. The network
>> >> >> library we use, allocates more direct memory than we expected.
>> >> >>
>> >> >> We'll push a fix to the master as soon as possible. Thank you for
>> >> >> reporting and thanks for your patience.
>> >> >>
>> >> >> Best regards,
>> >> >> Max
>> >> >>
>> >> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>> >> >>  wrote:
>> >> >> > Hi everyone,
>> >> >> >
>> >> >> > I'm constantly running into OutOfMemoryErrors and for the life of
>> >> >> > me
>> >> >> > I
>> >> >> > cannot figure out what's wrong. Let me describe my setup. I'm
>> >> >> > running
>> >> >> > the
>> >> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is
>> >> >> > an
>> >> >> > unfinished implementation of TPC-H Q2
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> >> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of
>> >> >> > memory
>> >> >> > per
>> >> >> > machine. This is what I believe to be the relevant section of my
>> >> >> > yarn_site.xml:
>> >> >> >
>> >> >> >
>> >> >> > 
>> >> >> > yarn.nodemanager.resource.memory-mb
>> >> >> > 57344
>> >> >> >   
>> >> >> > 
>> >> >> >   
>> >> >> > yarn.scheduler.maximum-allocation-mb
>> >> >> > 55296
>> >> >> >   
>> >> >> >
>> >> >> >   
>> >> >> > yarn.nodemanager.vmem-check-enabled
>> >> >> > false
>> >> >> >   
>> >> >> >
>> >> >> >
>> >> >> > And this is how I submit the job:
>> >> >> >
>> >> >> >
>> >> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768
>> >> >> > -yn 7
>> >> >> > .
>> >> >> >
>> >> >> >
>> >> >> > The TMs happily report:
>> >> >> >
>> >> >> > .
>> >> >> > 11:50:15,577 INFO
>> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> >> > -  JVM Options:
>> >> >> > 11:50:15,577 INFO
>> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> >> > - -Xms24511m
>> >> >> > 11:50:15,577 INFO
>> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> >> > - -Xmx24511m
>> >> >> > 11:50:15,577 INFO
>> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> >> > - -XX:MaxDirectMemorySize=65m
>> >> >> > .
>> >> >> >
>> >> >> >
>> >> >> > I've tried various combinations of YARN and Flink options, to no
>> >> >> > avail.
>> >> >> > I
>> >> >> > always end up with the following stacktrace:
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >> >> > java.lang.OutOfMemoryError: Direct buffer memory
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> >> > at
>> >> >> >
>> >> >>

Re: OutOfMemoryError in netty local transport

2015-10-01 Thread Robert Schmidtke
I pulled the current master branch and rebuilt Flink completely anyway.
Works like a charm.

On Thu, Oct 1, 2015 at 11:11 AM, Maximilian Michels  wrote:

> By the way, you might have to use the "-U" flag to force Maven to
> update its dependencies:  mvn -U clean install -DskipTests
>
> On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke
>  wrote:
> > Sweet! I'll pull it straight away. Thanks!
> >
> > On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels 
> wrote:
> >>
> >> Hi Robert,
> >>
> >> Just a quick update: The issue has been resolved in the latest Maven
> >> 0.10-SNAPSHOT dependency.
> >>
> >> Cheers,
> >> Max
> >>
> >> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
> >>  wrote:
> >> > Hi Max,
> >> >
> >> > thanks for your quick reply. I found the relevant code and commented
> it
> >> > out
> >> > for testing, seems to be working. Happily waiting for the fix. Thanks
> >> > again.
> >> >
> >> > Robert
> >> >
> >> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels 
> >> > wrote:
> >> >>
> >> >> Hi Robert,
> >> >>
> >> >> This is a regression on the current master due to changes in the way
> >> >> Flink calculates the memory and sets the maximum direct memory size.
> >> >> We introduced these changes when we merged support for off-heap
> >> >> memory. This is not a problem in the way Flink deals with managed
> >> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the
> >> >> maximum direct memory is only used by the network stack. The network
> >> >> library we use, allocates more direct memory than we expected.
> >> >>
> >> >> We'll push a fix to the master as soon as possible. Thank you for
> >> >> reporting and thanks for your patience.
> >> >>
> >> >> Best regards,
> >> >> Max
> >> >>
> >> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
> >> >>  wrote:
> >> >> > Hi everyone,
> >> >> >
> >> >> > I'm constantly running into OutOfMemoryErrors and for the life of
> me
> >> >> > I
> >> >> > cannot figure out what's wrong. Let me describe my setup. I'm
> running
> >> >> > the
> >> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
> >> >> > unfinished implementation of TPC-H Q2
> >> >> >
> >> >> >
> >> >> > (
> https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java
> ),
> >> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of
> >> >> > memory
> >> >> > per
> >> >> > machine. This is what I believe to be the relevant section of my
> >> >> > yarn_site.xml:
> >> >> >
> >> >> >
> >> >> > 
> >> >> > yarn.nodemanager.resource.memory-mb
> >> >> > 57344
> >> >> >   
> >> >> > 
> >> >> >   
> >> >> > yarn.scheduler.maximum-allocation-mb
> >> >> > 55296
> >> >> >   
> >> >> >
> >> >> >   
> >> >> > yarn.nodemanager.vmem-check-enabled
> >> >> > false
> >> >> >   
> >> >> >
> >> >> >
> >> >> > And this is how I submit the job:
> >> >> >
> >> >> >
> >> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768
> -yn 7
> >> >> > .
> >> >> >
> >> >> >
> >> >> > The TMs happily report:
> >> >> >
> >> >> > .
> >> >> > 11:50:15,577 INFO
> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> >> >> > -  JVM Options:
> >> >> > 11:50:15,577 INFO
> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> >> >> > - -Xms24511m
> >> >> > 11:50:15,577 INFO
> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> >> >> > - -Xmx24511m
> >> >> > 11:50:15,577 INFO
> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> >> >> > - -XX:MaxDirectMemorySize=65m
> >> >> > .
> >> >> >
> >> >> >
> >> >> > I've tried various combinations of YARN and Flink options, to no
> >> >> > avail.
> >> >> > I
> >> >> > always end up with the following stacktrace:
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> >> >> > java.lang.OutOfMemoryError: Direct buffer memory
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >

Re: OutOfMemoryError in netty local transport

2015-10-01 Thread Maximilian Michels
By the way, you might have to use the "-U" flag to force Maven to
update its dependencies:  mvn -U clean install -DskipTests

On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke
 wrote:
> Sweet! I'll pull it straight away. Thanks!
>
> On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels  wrote:
>>
>> Hi Robert,
>>
>> Just a quick update: The issue has been resolved in the latest Maven
>> 0.10-SNAPSHOT dependency.
>>
>> Cheers,
>> Max
>>
>> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
>>  wrote:
>> > Hi Max,
>> >
>> > thanks for your quick reply. I found the relevant code and commented it
>> > out
>> > for testing, seems to be working. Happily waiting for the fix. Thanks
>> > again.
>> >
>> > Robert
>> >
>> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels 
>> > wrote:
>> >>
>> >> Hi Robert,
>> >>
>> >> This is a regression on the current master due to changes in the way
>> >> Flink calculates the memory and sets the maximum direct memory size.
>> >> We introduced these changes when we merged support for off-heap
>> >> memory. This is not a problem in the way Flink deals with managed
>> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> >> maximum direct memory is only used by the network stack. The network
>> >> library we use, allocates more direct memory than we expected.
>> >>
>> >> We'll push a fix to the master as soon as possible. Thank you for
>> >> reporting and thanks for your patience.
>> >>
>> >> Best regards,
>> >> Max
>> >>
>> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>> >>  wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I'm constantly running into OutOfMemoryErrors and for the life of me
>> >> > I
>> >> > cannot figure out what's wrong. Let me describe my setup. I'm running
>> >> > the
>> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
>> >> > unfinished implementation of TPC-H Q2
>> >> >
>> >> >
>> >> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of
>> >> > memory
>> >> > per
>> >> > machine. This is what I believe to be the relevant section of my
>> >> > yarn_site.xml:
>> >> >
>> >> >
>> >> > 
>> >> > yarn.nodemanager.resource.memory-mb
>> >> > 57344
>> >> >   
>> >> > 
>> >> >   
>> >> > yarn.scheduler.maximum-allocation-mb
>> >> > 55296
>> >> >   
>> >> >
>> >> >   
>> >> > yarn.nodemanager.vmem-check-enabled
>> >> > false
>> >> >   
>> >> >
>> >> >
>> >> > And this is how I submit the job:
>> >> >
>> >> >
>> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7
>> >> > .
>> >> >
>> >> >
>> >> > The TMs happily report:
>> >> >
>> >> > .
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > -  JVM Options:
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > - -Xms24511m
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > - -Xmx24511m
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > - -XX:MaxDirectMemorySize=65m
>> >> > .
>> >> >
>> >> >
>> >> > I've tried various combinations of YARN and Flink options, to no
>> >> > avail.
>> >> > I
>> >> > always end up with the following stacktrace:
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >> > java.lang.OutOfMemoryError: Direct buffer memory
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.inv

Re: OutOfMemoryError in netty local transport

2015-10-01 Thread Robert Schmidtke
Sweet! I'll pull it straight away. Thanks!

On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels  wrote:

> Hi Robert,
>
> Just a quick update: The issue has been resolved in the latest Maven
> 0.10-SNAPSHOT dependency.
>
> Cheers,
> Max
>
> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
>  wrote:
> > Hi Max,
> >
> > thanks for your quick reply. I found the relevant code and commented it
> out
> > for testing, seems to be working. Happily waiting for the fix. Thanks
> again.
> >
> > Robert
> >
> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels 
> wrote:
> >>
> >> Hi Robert,
> >>
> >> This is a regression on the current master due to changes in the way
> >> Flink calculates the memory and sets the maximum direct memory size.
> >> We introduced these changes when we merged support for off-heap
> >> memory. This is not a problem in the way Flink deals with managed
> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the
> >> maximum direct memory is only used by the network stack. The network
> >> library we use, allocates more direct memory than we expected.
> >>
> >> We'll push a fix to the master as soon as possible. Thank you for
> >> reporting and thanks for your patience.
> >>
> >> Best regards,
> >> Max
> >>
> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
> >>  wrote:
> >> > Hi everyone,
> >> >
> >> > I'm constantly running into OutOfMemoryErrors and for the life of me I
> >> > cannot figure out what's wrong. Let me describe my setup. I'm running
> >> > the
> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
> >> > unfinished implementation of TPC-H Q2
> >> >
> >> > (
> https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java
> ),
> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory
> >> > per
> >> > machine. This is what I believe to be the relevant section of my
> >> > yarn_site.xml:
> >> >
> >> >
> >> > 
> >> > yarn.nodemanager.resource.memory-mb
> >> > 57344
> >> >   
> >> > 
> >> >   
> >> > yarn.scheduler.maximum-allocation-mb
> >> > 55296
> >> >   
> >> >
> >> >   
> >> > yarn.nodemanager.vmem-check-enabled
> >> > false
> >> >   
> >> >
> >> >
> >> > And this is how I submit the job:
> >> >
> >> >
> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7
> >> > .
> >> >
> >> >
> >> > The TMs happily report:
> >> >
> >> > .
> >> > 11:50:15,577 INFO
> org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> >> > -  JVM Options:
> >> > 11:50:15,577 INFO
> org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> >> > - -Xms24511m
> >> > 11:50:15,577 INFO
> org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> >> > - -Xmx24511m
> >> > 11:50:15,577 INFO
> org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> >> > - -XX:MaxDirectMemorySize=65m
> >> > .
> >> >
> >> >
> >> > I've tried various combinations of YARN and Flink options, to no
> avail.
> >> > I
> >> > always end up with the following stacktrace:
> >> >
> >> >
> >> >
> >> >
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> >> > java.lang.OutOfMemoryError: Direct buffer memory
> >> > at
> >> >
> >> >
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
> >> > at
> >> >
> >> >
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> >> > at
> >> >
> >> >
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> >> > at
> >> >
> >> >
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> >> > at
> >> >
> >> >
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> >> > at
> >> >
> >> >
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> >> > at
> >> >
> >> >
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> >> > at
> >> >
> >> >
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> >> > at
> >> >
> >> >
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
> >> > at
> >> >
> >> >
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
> >> > at
> >> >
> >> >
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> >> > at
> >> >
> >> >
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> >> > at
> >> >
> >> >
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> >> > at
> >> >
> >

Re: OutOfMemoryError in netty local transport

2015-10-01 Thread Maximilian Michels
Hi Robert,

Just a quick update: The issue has been resolved in the latest Maven
0.10-SNAPSHOT dependency.

Cheers,
Max

On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
 wrote:
> Hi Max,
>
> thanks for your quick reply. I found the relevant code and commented it out
> for testing, seems to be working. Happily waiting for the fix. Thanks again.
>
> Robert
>
> On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels  wrote:
>>
>> Hi Robert,
>>
>> This is a regression on the current master due to changes in the way
>> Flink calculates the memory and sets the maximum direct memory size.
>> We introduced these changes when we merged support for off-heap
>> memory. This is not a problem in the way Flink deals with managed
>> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> maximum direct memory is only used by the network stack. The network
>> library we use, allocates more direct memory than we expected.
>>
>> We'll push a fix to the master as soon as possible. Thank you for
>> reporting and thanks for your patience.
>>
>> Best regards,
>> Max
>>
>> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>>  wrote:
>> > Hi everyone,
>> >
>> > I'm constantly running into OutOfMemoryErrors and for the life of me I
>> > cannot figure out what's wrong. Let me describe my setup. I'm running
>> > the
>> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
>> > unfinished implementation of TPC-H Q2
>> >
>> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory
>> > per
>> > machine. This is what I believe to be the relevant section of my
>> > yarn_site.xml:
>> >
>> >
>> > 
>> > yarn.nodemanager.resource.memory-mb
>> > 57344
>> >   
>> > 
>> >   
>> > yarn.scheduler.maximum-allocation-mb
>> > 55296
>> >   
>> >
>> >   
>> > yarn.nodemanager.vmem-check-enabled
>> > false
>> >   
>> >
>> >
>> > And this is how I submit the job:
>> >
>> >
>> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7
>> > .
>> >
>> >
>> > The TMs happily report:
>> >
>> > .
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > -  JVM Options:
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > - -Xms24511m
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > - -Xmx24511m
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > - -XX:MaxDirectMemorySize=65m
>> > .
>> >
>> >
>> > I've tried various combinations of YARN and Flink options, to no avail.
>> > I
>> > always end up with the following stacktrace:
>> >
>> >
>> >
>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> > java.lang.OutOfMemoryError: Direct buffer memory
>> > at
>> >
>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> > at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> > at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> > at
>> >
>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> > at
>> >
>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> > at
>> >
>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(Sin

Re: OutOfMemoryError in netty local transport

2015-09-30 Thread Robert Schmidtke
Hi Max,

thanks for your quick reply. I found the relevant code and commented it out
for testing, seems to be working. Happily waiting for the fix. Thanks again.

Robert

On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels  wrote:

> Hi Robert,
>
> This is a regression on the current master due to changes in the way
> Flink calculates the memory and sets the maximum direct memory size.
> We introduced these changes when we merged support for off-heap
> memory. This is not a problem in the way Flink deals with managed
> memory, just -XX:MaxDirectMemorySize is set too low. By default the
> maximum direct memory is only used by the network stack. The network
> library we use, allocates more direct memory than we expected.
>
> We'll push a fix to the master as soon as possible. Thank you for
> reporting and thanks for your patience.
>
> Best regards,
> Max
>
> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>  wrote:
> > Hi everyone,
> >
> > I'm constantly running into OutOfMemoryErrors and for the life of me I
> > cannot figure out what's wrong. Let me describe my setup. I'm running the
> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
> > unfinished implementation of TPC-H Q2
> > (
> https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java
> ),
> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory
> per
> > machine. This is what I believe to be the relevant section of my
> > yarn_site.xml:
> >
> >
> > 
> > yarn.nodemanager.resource.memory-mb
> > 57344
> >   
> > 
> >   
> > yarn.scheduler.maximum-allocation-mb
> > 55296
> >   
> >
> >   
> > yarn.nodemanager.vmem-check-enabled
> > false
> >   
> >
> >
> > And this is how I submit the job:
> >
> >
> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7
> .
> >
> >
> > The TMs happily report:
> >
> > .
> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> > -  JVM Options:
> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> > - -Xms24511m
> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> > - -Xmx24511m
> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> > - -XX:MaxDirectMemorySize=65m
> > .
> >
> >
> > I've tried various combinations of YARN and Flink options, to no avail. I
> > always end up with the following stacktrace:
> >
> >
> >
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> > java.lang.OutOfMemoryError: Direct buffer memory
> > at
> >
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
> > at
> >
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> > at
> >
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> > at
> >
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> > at
> >
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> > at
> >
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> > at
> >
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> > at
> >
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> > at
> >
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
> > at
> >
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
> > at
> >
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> > at
> >
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> > at
> >
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> > at
> >
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> > at
> >
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> > at
> >
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> > at
> >
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: io.netty.handler.codec.DecoderException:
> > java.lang.OutOfMemoryError: Direct buffer memory
> > at
> >
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
> > at
> >
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr

Re: OutOfMemoryError in netty local transport

2015-09-30 Thread Maximilian Michels
Hi Robert,

This is a regression on the current master due to changes in the way
Flink calculates the memory and sets the maximum direct memory size.
We introduced these changes when we merged support for off-heap
memory. This is not a problem in the way Flink deals with managed
memory, just -XX:MaxDirectMemorySize is set too low. By default the
maximum direct memory is only used by the network stack. The network
library we use, allocates more direct memory than we expected.

We'll push a fix to the master as soon as possible. Thank you for
reporting and thanks for your patience.

Best regards,
Max

On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
 wrote:
> Hi everyone,
>
> I'm constantly running into OutOfMemoryErrors and for the life of me I
> cannot figure out what's wrong. Let me describe my setup. I'm running the
> current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
> unfinished implementation of TPC-H Q2
> (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
> I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory per
> machine. This is what I believe to be the relevant section of my
> yarn_site.xml:
>
>
> 
> yarn.nodemanager.resource.memory-mb
> 57344
>   
> 
>   
> yarn.scheduler.maximum-allocation-mb
> 55296
>   
>
>   
> yarn.nodemanager.vmem-check-enabled
> false
>   
>
>
> And this is how I submit the job:
>
>
> $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7 .
>
>
> The TMs happily report:
>
> .
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -  JVM Options:
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> - -Xms24511m
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> - -Xmx24511m
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> - -XX:MaxDirectMemorySize=65m
> .
>
>
> I've tried various combinations of YARN and Flink options, to no avail. I
> always end up with the following stacktrace:
>
>
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> java.lang.OutOfMemoryError: Direct buffer memory
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.DecoderException:
> java.lang.OutOfMemoryError: Direct buffer memory
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> ... 9 more
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:658)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
> at
> io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108)
> at
> io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157)
> at io.netty.buffer.Abstr

Fwd: OutOfMemoryError in netty local transport

2015-09-30 Thread Robert Schmidtke
Hi everyone,

I'm constantly running into OutOfMemoryErrors and for the life of me I
cannot figure out what's wrong. Let me describe my setup. I'm running the
current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
unfinished implementation of TPC-H Q2 (
https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory per
machine. This is what I believe to be the relevant section of my
yarn_site.xml:



yarn.nodemanager.resource.memory-mb
57344
  

  
yarn.scheduler.maximum-allocation-mb
55296
  

  
yarn.nodemanager.vmem-check-enabled
false
  


And this is how I submit the job:


$FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7 .


The TMs happily report:

.
11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
  -  JVM Options:
11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
  - -Xms24511m
11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
  - -Xmx24511m
11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
  - -XX:MaxDirectMemorySize=65m
.


I've tried various combinations of YARN and Flink options, to no avail. I
always end up with the following stacktrace:


org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
java.lang.OutOfMemoryError: Direct buffer memory
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
at
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
at
io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
at
io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
at
io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
at
io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
at
io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.handler.codec.DecoderException:
java.lang.OutOfMemoryError: Direct buffer memory
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
... 9 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:658)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
at
io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108)
at
io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157)
at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
at
io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
... 10 more


I always figured that running into OOMEs with Flink would be quite hard to
achieve, however I'm wondering what's going wrong now. Seems to be related
to the Direct Memory? Why are you limiting it in the JVM options at all? Is
there a special place where I can safely increase the size / remove the
option altogether for unboundedness?

A note on the data sizes, I used a scalin

Re: HBase on 4 machine cluster - OutOfMemoryError

2015-07-19 Thread Stephan Ewen
Okay. If you are using very big values, it often helps to tell Flink to
reserve less memory for its internal processing.

Can you try and set the memory fraction lower, e.g., 0.5 lower.

Have a look at the option "taskmanager.memory.fraction" (
https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html
)

Greetings,
Stephan


Am 18.07.2015 15:47 schrieb "Lydia Ickler" :

> Hi,
>
> yes, it is in one row. Each row represents a patient that has values of
> 20.000 different genes stored in one column family and one value of health
> status in a second column family.
>
>
> Am 18.07.2015 um 15:38 schrieb Stephan Ewen :
>
> This error is in the HBase RPC Service. Apparently the RPC message is very
> large.
>
> Is the data that you request in one row?
> Am 18.07.2015 00:50 schrieb "Lydia Ickler" :
>
>> Hi all,
>>
>> I am trying to read a data set from HBase within a cluster application.
>> The data is about 90MB big.
>>
>> When I run the program on a cluster consisting of 4 machines (8GB RAM) I
>> get the following error on the head-node:
>>
>> 16:57:41,572 INFO
>> org.apache.flink.api.common.io.LocatableInputSplitAssigner- Assigning
>> remote split to host grips5
>> 17:17:26,127 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- DataSource
>> (at createInput(ExecutionEnvironment.java:502)
>> (org.apache.flink.addons.hbase.HBaseR$
>> 17:17:26,128 INFO  org.apache.flink.runtime.jobmanager.JobManager
>> - Status of job b768ff76167fa3ea3e4cb3cc3481ba80 (Labeled - ML)
>> changed to FAILING.
>>
>> And within the machine grips5:
>> 16:57:23,769 INFO  org.apache.flink.addons.hbase.TableInputFormat
>> - opening split [1|[grips1:16020]|LUAD+5781|LUAD+7539]
>> 16:57:33,734 WARN  org.apache.hadoop.ipc.RpcClient
>> - IPC Client (767445418) connection to grips1/130.73.20.14:16020
>> from hduser: unexpected exceptio$
>> java.lang.OutOfMemoryError: Java heap space
>> at
>> org.apache.hadoop.hbase.ipc.RpcClient$Connection.readResponse(RpcClient.java:1117)
>> at
>> org.apache.hadoop.hbase.ipc.RpcClient$Connection.run(RpcClient.java:727)
>> 16:57:39,969 WARN  org.apache.hadoop.ipc.RpcClient
>> - IPC Client (767445418) connection to grips1/130.73.20.14:16020
>> from hduser: unexpected exceptio$
>> java.lang.OutOfMemoryError: Java heap space
>>
>> and then it just closes the zookeeper…
>>
>> Do you have a suggestion how to avoid this OutOfMemoryError?
>> Best regards,
>> Lydia
>>
>>
>>
>>
>


Re: HBase on 4 machine cluster - OutOfMemoryError

2015-07-18 Thread Lydia Ickler
Hi,

yes, it is in one row. Each row represents a patient that has values of 20.000 
different genes stored in one column family and one value of health status in a 
second column family.


> Am 18.07.2015 um 15:38 schrieb Stephan Ewen :
> 
> This error is in the HBase RPC Service. Apparently the RPC message is very 
> large.
> 
> Is the data that you request in one row?
> 
> Am 18.07.2015 00:50 schrieb "Lydia Ickler"  <mailto:ickle...@googlemail.com>>:
> Hi all,
> 
> I am trying to read a data set from HBase within a cluster application. 
> The data is about 90MB big.
> 
> When I run the program on a cluster consisting of 4 machines (8GB RAM) I get 
> the following error on the head-node:
> 
> 16:57:41,572 INFO  org.apache.flink.api.common.io.LocatableInputSplitAssigner 
>- Assigning remote split to host grips5
> 17:17:26,127 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph 
>- DataSource (at createInput(ExecutionEnvironment.java:502) 
> (org.apache.flink.addons.hbase.HBaseR$
> 17:17:26,128 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job b768ff76167fa3ea3e4cb3cc3481ba80 (Labeled - ML) changed to 
> FAILING.
> 
> And within the machine grips5:
> 16:57:23,769 INFO  org.apache.flink.addons.hbase.TableInputFormat 
>- opening split [1|[grips1:16020]|LUAD+5781|LUAD+7539]
> 16:57:33,734 WARN  org.apache.hadoop.ipc.RpcClient
>- IPC Client (767445418) connection to grips1/130.73.20.14:16020 
> <http://130.73.20.14:16020/> from hduser: unexpected exceptio$
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.hadoop.hbase.ipc.RpcClient$Connection.readResponse(RpcClient.java:1117)
> at 
> org.apache.hadoop.hbase.ipc.RpcClient$Connection.run(RpcClient.java:727)
> 16:57:39,969 WARN  org.apache.hadoop.ipc.RpcClient
>- IPC Client (767445418) connection to grips1/130.73.20.14:16020 
> <http://130.73.20.14:16020/> from hduser: unexpected exceptio$
> java.lang.OutOfMemoryError: Java heap space
> 
> and then it just closes the zookeeper…
> 
> Do you have a suggestion how to avoid this OutOfMemoryError?
> Best regards,
> Lydia
> 
> 
> 



Re: HBase on 4 machine cluster - OutOfMemoryError

2015-07-18 Thread Stephan Ewen
This error is in the HBase RPC Service. Apparently the RPC message is very
large.

Is the data that you request in one row?
Am 18.07.2015 00:50 schrieb "Lydia Ickler" :

> Hi all,
>
> I am trying to read a data set from HBase within a cluster application.
> The data is about 90MB big.
>
> When I run the program on a cluster consisting of 4 machines (8GB RAM) I
> get the following error on the head-node:
>
> 16:57:41,572 INFO
> org.apache.flink.api.common.io.LocatableInputSplitAssigner- Assigning
> remote split to host grips5
> 17:17:26,127 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>   - DataSource (at createInput(ExecutionEnvironment.java:502)
> (org.apache.flink.addons.hbase.HBaseR$
> 17:17:26,128 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - Status of job b768ff76167fa3ea3e4cb3cc3481ba80 (Labeled - ML)
> changed to FAILING.
>
> And within the machine grips5:
> 16:57:23,769 INFO  org.apache.flink.addons.hbase.TableInputFormat
>   - opening split [1|[grips1:16020]|LUAD+5781|LUAD+7539]
> 16:57:33,734 WARN  org.apache.hadoop.ipc.RpcClient
>   - IPC Client (767445418) connection to grips1/130.73.20.14:16020
> from hduser: unexpected exceptio$
> java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.hadoop.hbase.ipc.RpcClient$Connection.readResponse(RpcClient.java:1117)
> at
> org.apache.hadoop.hbase.ipc.RpcClient$Connection.run(RpcClient.java:727)
> 16:57:39,969 WARN  org.apache.hadoop.ipc.RpcClient
>   - IPC Client (767445418) connection to grips1/130.73.20.14:16020
> from hduser: unexpected exceptio$
> java.lang.OutOfMemoryError: Java heap space
>
> and then it just closes the zookeeper…
>
> Do you have a suggestion how to avoid this OutOfMemoryError?
> Best regards,
> Lydia
>
>
>
>


HBase on 4 machine cluster - OutOfMemoryError

2015-07-17 Thread Lydia Ickler
Hi all,

I am trying to read a data set from HBase within a cluster application. 
The data is about 90MB big.

When I run the program on a cluster consisting of 4 machines (8GB RAM) I get 
the following error on the head-node:

16:57:41,572 INFO  org.apache.flink.api.common.io.LocatableInputSplitAssigner   
 - Assigning remote split to host grips5
17:17:26,127 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - DataSource (at createInput(ExecutionEnvironment.java:502) 
(org.apache.flink.addons.hbase.HBaseR$
17:17:26,128 INFO  org.apache.flink.runtime.jobmanager.JobManager   
 - Status of job b768ff76167fa3ea3e4cb3cc3481ba80 (Labeled - ML) changed to 
FAILING.

And within the machine grips5:
16:57:23,769 INFO  org.apache.flink.addons.hbase.TableInputFormat   
 - opening split [1|[grips1:16020]|LUAD+5781|LUAD+7539]
16:57:33,734 WARN  org.apache.hadoop.ipc.RpcClient  
 - IPC Client (767445418) connection to grips1/130.73.20.14:16020 from hduser: 
unexpected exceptio$
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.hadoop.hbase.ipc.RpcClient$Connection.readResponse(RpcClient.java:1117)
at 
org.apache.hadoop.hbase.ipc.RpcClient$Connection.run(RpcClient.java:727)
16:57:39,969 WARN  org.apache.hadoop.ipc.RpcClient  
 - IPC Client (767445418) connection to grips1/130.73.20.14:16020 from hduser: 
unexpected exceptio$
java.lang.OutOfMemoryError: Java heap space

and then it just closes the zookeeper…

Do you have a suggestion how to avoid this OutOfMemoryError?
Best regards,
Lydia