Issues in Batch Jobs Submission for a Session Cluster

2021-12-01 Thread Ghiya, Jay (GE Healthcare)
Hi Flink Team,

Greetings from GE Healthcare team.

Here is a stackoverflow post for the same too posted by fellow dev here : 
https://stackoverflow.com/questions/70068336/flink-job-not-getting-submitted-java-io-ioexception-cannot-allocate-memory

Summary of the post:

Here is the usecase and relevant configuration:

  1.  A flink session cluster in Kubernetes is being utilized to submit batch 
jobs every 1minute. Run time for a batch job is <30 seconds.
  2.  This Flink session cluster is running in HA setup. This means it stores 
job graph and its relevant metadata in flink " /recovery/default/blob/," folder 
for each job that is submitted.
  3.  There is "5 Gb" pvc attached to this session cluster for HA and is based 
out of cephfs. https://docs.ceph.com/en/pacific/cephfs/index.html . Rook is 
used for orchestration.

Ideal Working Scenario:

  1.  Upon a successful job submission, the metadata is created and cleared 
after completion. Average size of blob getting created under recovery folder 
for ha is 150 mb. (Enough space in pvc)

Failure:

  1.  During a long run say for 100 minutes so 100 job submissions , a flink 
job submission will fail stating :


2021-11-22 09:03:11,537 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Finished 
cleaning up the high availability data for job 6a71a36a3c82d8a9438c9aa9ed6b8993.
2021-11-22 09:03:14,904 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection   [] - PUT operation 
failed
java.io.IOException: Cannot allocate memory
at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_312]
at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_312]
at 
org.apache.flink.core.fs.local.LocalDataOutputStream.write(LocalDataOutputStream.java:55)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.guava30.com.google.common.io.ByteStreams.copy(ByteStreams.java:113)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.guava30.com.google.common.io.ByteSource.copyTo(ByteSource.java:243)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.guava30.com.google.common.io.Files.copy(Files.java:301) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:79)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:680)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350)
 [flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110)
 [flink-dist_2.11-1.14.0.jar:1.14.0]

NOTE that still pvc has not become full. Also the next job submission would 
succeed without complaining about the storage being full. But due to random 
failures eventually the old blobs will pile up in recovery folder making pvc 
full after which all jobs submissions will fail.

Request immediate help here folks. Any pointers to why this behavior. We are 
relying on HA to make sure each job runs fine. This is mission critical data.
Secondly what we would want to see - why stale data is not being cleared off ? 
is it a configuration that we have not done. Still this does not solve our data 
loss problem we experience due to intermittent job submission failures but will 
make sure our pvc is not unnecessary saturated beyond which all jobs fail.

Thankyou in advance.

-Jay
GEHC







Re: Issues in Batch Jobs Submission for a Session Cluster

2021-12-02 Thread David Morávek
Hi Jay,

It's hard to say what going on here. My best guess is that you're running
out of memory for your process (eg. hitting ulimit). Can you please start
with checking the ulimits memory usage of your container?

For the cleanup, right now it may happen in some failover scenarios that we
don't cleanup some part's of the HA services (eg. blob store). There is
FLIP-194 that addresses this limitation.

At first sight this sounds bit like a native memory leak, but these are in
general really tricky to debug. Let's start with simply getting some stats
about the actual memory usage.

Best,
D.


On Thu, Dec 2, 2021 at 7:49 AM Ghiya, Jay (GE Healthcare) 
wrote:

> Hi Flink Team,
>
>
>
> Greetings from GE Healthcare team.
>
>
>
> Here is a stackoverflow post for the same too posted by fellow dev here :
> https://stackoverflow.com/questions/70068336/flink-job-not-getting-submitted-java-io-ioexception-cannot-allocate-memory
>
>
>
> Summary of the post:
>
>
>
> Here is the usecase and relevant configuration:
>
>1. A flink session cluster in Kubernetes is being utilized to submit
>batch jobs every 1minute. Run time for a batch job is <30 seconds.
>2. This Flink session cluster is running in HA setup. This means it
>stores job graph and its relevant metadata in flink “
> /recovery/default/blob/,” folder for each job that is submitted.
>3. There is “5 Gb” pvc attached to this session cluster for HA and is
>based out of cephfs. https://docs.ceph.com/en/pacific/cephfs/index.html
>. Rook is used for orchestration.
>
>
>
> Ideal Working Scenario:
>
>1. Upon a successful job submission, the metadata is created and
>cleared after completion. Average size of blob getting created under
>recovery folder for ha is 150 mb. (Enough space in pvc)
>
>
>
> Failure:
>
>1. During a long run say for 100 minutes so 100 job submissions , a
>flink job submission will fail stating :
>
>
>
> 2021-11-22 09:03:11,537 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data for job
> 6a71a36a3c82d8a9438c9aa9ed6b8993.
>
> 2021-11-22 09:03:14,904 ERROR
> org.apache.flink.runtime.blob.BlobServerConnection   [] - PUT
> operation failed
>
> java.io.IOException: Cannot allocate memory
>
> at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_312]
>
> at java.io.FileOutputStream.write(FileOutputStream.java:326)
> ~[?:1.8.0_312]
>
> at
> org.apache.flink.core.fs.local.LocalDataOutputStream.write(LocalDataOutputStream.java:55)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
> at
> org.apache.flink.shaded.guava30.com.google.common.io.ByteStreams.copy(ByteStreams.java:113)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
> at
> org.apache.flink.shaded.guava30.com.google.common.io.ByteSource.copyTo(ByteSource.java:243)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
> at
> org.apache.flink.shaded.guava30.com.google.common.io.Files.copy(Files.java:301)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
> at
> org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:79)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
> at
> org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
> at
> org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
> at
> org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:680)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
> at
> org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350)
> [flink-dist_2.11-1.14.0.jar:1.14.0]
>
> at
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110)
> [flink-dist_2.11-1.14.0.jar:1.14.0]
>
>
>
> NOTE that still pvc has not become full. Also the next job submission
> would succeed without complaining about the storage being full. But due to
> random failures eventually the old blobs will pile up in recovery folder
> making pvc full after which all jobs submissions will fail.
>
>
>
> Request immediate help here folks. Any pointers to why this behavior. We
> are relying on HA to make sure each job runs fine. This is mission critical
> data.
>
> Secondly what we would want to see – why stale data is not being cleared
> off ? is it a configuration that we have not done. Still this does not
> solve our data loss problem we experience due to intermittent job
> submission failures but will make sure our pvc is not unnecessary saturated
> beyond which all jobs fail.
>
>
>
> Thankyou in advance.
>
> -Jay
>
> GEHC
>
>
>
>
>
>
>


RE: EXT: Re: Issues in Batch Jobs Submission for a Session Cluster

2021-12-02 Thread Ghiya, Jay (GE Healthcare)
Thanks for prompt response. Understood @David 
Morávek<mailto:david.mora...@gmail.com>. Will record cpu and mem usage from 
Kubernetes metrics Grafana dashboard  of job managers and task managers when 
this happens and share here. If there is anything abnormal then we can get the 
jvm metrics for each pod in terms of heap,non-heap,gc behaviour .

@R, Aromal (GE Healthcare, consultant)<mailto:aroma...@ge.com> Can you please 
post cpu and mem usage of jms and tms from the Kubernetes dashboard when this 
issue happens?

Thanks
Jay

From: David Morávek 
Sent: 02 December 2021 16:34
To: Ghiya, Jay (GE Healthcare) 
Cc: user@flink.apache.org; Nellimarla, Aswini (GE Healthcare) 
; R, Aromal (GE Healthcare, consultant) 
; Kumar, Vipin (GE Healthcare) ; 
Maniyan, Pramod (GE Healthcare, consultant) 
Subject: EXT: Re: Issues in Batch Jobs Submission for a Session Cluster

WARNING: This email originated from outside of GE. Please validate the sender's 
email address before clicking on links or attachments as they may not be safe.
Hi Jay,

It's hard to say what going on here. My best guess is that you're running out 
of memory for your process (eg. hitting ulimit). Can you please start with 
checking the ulimits memory usage of your container?

For the cleanup, right now it may happen in some failover scenarios that we 
don't cleanup some part's of the HA services (eg. blob store). There is 
FLIP-194 that addresses this limitation.

At first sight this sounds bit like a native memory leak, but these are in 
general really tricky to debug. Let's start with simply getting some stats 
about the actual memory usage.

Best,
D.


On Thu, Dec 2, 2021 at 7:49 AM Ghiya, Jay (GE Healthcare) 
mailto:jay.gh...@ge.com>> wrote:
Hi Flink Team,

Greetings from GE Healthcare team.

Here is a stackoverflow post for the same too posted by fellow dev here : 
https://stackoverflow.com/questions/70068336/flink-job-not-getting-submitted-java-io-ioexception-cannot-allocate-memory

Summary of the post:

Here is the usecase and relevant configuration:

  1.  A flink session cluster in Kubernetes is being utilized to submit batch 
jobs every 1minute. Run time for a batch job is <30 seconds.
  2.  This Flink session cluster is running in HA setup. This means it stores 
job graph and its relevant metadata in flink “ /recovery/default/blob/,” folder 
for each job that is submitted.
  3.  There is “5 Gb” pvc attached to this session cluster for HA and is based 
out of cephfs. https://docs.ceph.com/en/pacific/cephfs/index.html . Rook is 
used for orchestration.

Ideal Working Scenario:

  1.  Upon a successful job submission, the metadata is created and cleared 
after completion. Average size of blob getting created under recovery folder 
for ha is 150 mb. (Enough space in pvc)

Failure:

  1.  During a long run say for 100 minutes so 100 job submissions , a flink 
job submission will fail stating :


2021-11-22 09:03:11,537 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Finished 
cleaning up the high availability data for job 6a71a36a3c82d8a9438c9aa9ed6b8993.
2021-11-22 09:03:14,904 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection   [] - PUT operation 
failed
java.io.IOException: Cannot allocate memory
at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_312]
at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_312]
at 
org.apache.flink.core.fs.local.LocalDataOutputStream.write(LocalDataOutputStream.java:55)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.guava30.com.google.common.io.ByteStreams.copy(ByteStreams.java:113)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.guava30.com.google.common.io.ByteSource.copyTo(ByteSource.java:243)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.guava30.com.google.common.io.Files.copy(Files.java:301) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:79)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:680)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350)
 [flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110)
 [flink-dist_2.11-1.14.0.jar:1.14.0]

NOTE that still pvc has not become full. Also the next job submission would 
succeed without complaining about the storage being full. But due to random 
failures eventually the old blobs will pile up in re

Re: FW: EXT: Re: Issues in Batch Jobs Submission for a Session Cluster

2021-12-13 Thread David Morávek
Hi,

As far as I understand "java.io.IOException: Cannot allocate memory"
happens when JVM is not able to allocate a new memory from the OS. If
that's that case, I'd suggest increasing JVM overhead [1], because that's
basically a pool of memory, that is free to be allocated by native
libraries.

How are you submitting jobs into the cluster? Are you using command line
client or the web-submission?

If increasing JVM overhead doesn't help, you'll need to find out where the
native memory goes. The most straightforward approach I'm aware of is to
track allocations using jemalloc [2].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_jobmanager/#detailed-configuration
[2]
https://technology.blog.gov.uk/2015/12/11/using-jemalloc-to-get-to-the-bottom-of-a-memory-leak/

Best,
D.

On Mon, Dec 13, 2021 at 11:49 AM Ghiya, Jay (GE Healthcare) <
jay.gh...@ge.com> wrote:

> Hi @David Morávek ,
>
>
>
> PFA details regarding memory config in the configmap we have set and
> corresponding usage details in terms of cpu,mem and jvm when the issue
> happens.
>
>
>
> Credits: @R, Aromal (GE Healthcare, consultant) 
>
>
>
> -Jay
>
>
>
> *From:* R, Aromal (GE Healthcare, consultant) 
> *Sent:* 13 December 2021 16:16
> *To:* Ghiya, Jay (GE Healthcare) 
> *Subject:* FW: EXT: Re: Issues in Batch Jobs Submission for a Session
> Cluster
>
>
>
> FYI
>
>
>
> *From:* R, Aromal (GE Healthcare, consultant)
> *Sent:* 13 December 2021 16:09
> *To:* David Morávek 
> *Cc:* user@flink.apache.org; Nellimarla, Aswini (GE Healthcare) <
> aswini.nellima...@ge.com>; Kumar, Vipin (GE Healthcare) <
> vipin.s.ku...@ge.com>; Maniyan, Pramod (GE Healthcare, consultant) <
> pramod.mani...@ge.com>; Ghiya, Jay (GE Healthcare) 
> *Subject:* RE: EXT: Re: Issues in Batch Jobs Submission for a Session
> Cluster
>
>
>
> Hi Moravek,
>
>
>
> I am attaching the Grafana dashboard graphs of the cpu and mem usage.
>
>
>
> We have a Flink Session cluster with 3 JM and 4 TM deployed. HA is
> enabled. Attaching the config file also.
>
>
>
> On the path   /flink/recovery/default/blob/ two folders didn’t get deleted.
>
>
>
> *drwxr-xr-x 1 flink flink 1 Dec 13 06:07
> job_3d86ca8904bafa74fb587ff344427ff0*
>
> *drwxr-xr-x 1 flink flink 1 Dec 13 07:01
> job_6621c5d3633423f86c809a16fa629567*
>
>
>
> During this time the graphs(cpu, mem usage) seems normal. Could you also
> take a look at the graphs.
>
>
>
> Thanks And Regards
>
> Aromal
>
>
>
> *From:* Ghiya, Jay (GE Healthcare)
> *Sent:* 03 December 2021 12:49
> *To:* David Morávek ; R, Aromal (GE Healthcare,
> consultant) 
> *Cc:* user@flink.apache.org; Nellimarla, Aswini (GE Healthcare) <
> aswini.nellima...@ge.com>; Kumar, Vipin (GE Healthcare) <
> vipin.s.ku...@ge.com>; Maniyan, Pramod (GE Healthcare, consultant) <
> pramod.mani...@ge.com>
> *Subject:* RE: EXT: Re: Issues in Batch Jobs Submission for a Session
> Cluster
>
>
>
> Thanks for prompt response. Understood @David Morávek
> . Will record cpu and mem usage from Kubernetes
> metrics Grafana dashboard  of job managers and task managers when this
> happens and share here. If there is anything abnormal then we can get the
> jvm metrics for each pod in terms of heap,non-heap,gc behaviour .
>
>
>
> @R, Aromal (GE Healthcare, consultant)  Can you please
> post cpu and mem usage of jms and tms from the Kubernetes dashboard when
> this issue happens?
>
>
>
> Thanks
>
> Jay
>
>
>
> *From:* David Morávek 
> *Sent:* 02 December 2021 16:34
> *To:* Ghiya, Jay (GE Healthcare) 
> *Cc:* user@flink.apache.org; Nellimarla, Aswini (GE Healthcare) <
> aswini.nellima...@ge.com>; R, Aromal (GE Healthcare, consultant) <
> aroma...@ge.com>; Kumar, Vipin (GE Healthcare) ;
> Maniyan, Pramod (GE Healthcare, consultant) 
> *Subject:* EXT: Re: Issues in Batch Jobs Submission for a Session Cluster
>
>
>
> *WARNING: *This email originated from outside of GE. Please validate the
> sender's email address before clicking on links or attachments as they may
> not be safe.
>
> Hi Jay,
>
>
>
> It's hard to say what going on here. My best guess is that you're running
> out of memory for your process (eg. hitting ulimit). Can you please start
> with checking the ulimits memory usage of your container?
>
>
>
> For the cleanup, right now it may happen in some failover scenarios that
> we don't cleanup some part's of the HA services (eg. blob store). There is
> FLIP-194 that addresses this limitation.
>
>
>
> At first sight