Hi Flink Team,

Greetings from GE Healthcare team.

Here is a stackoverflow post for the same too posted by fellow dev here : 
https://stackoverflow.com/questions/70068336/flink-job-not-getting-submitted-java-io-ioexception-cannot-allocate-memory

Summary of the post:

Here is the usecase and relevant configuration:

  1.  A flink session cluster in Kubernetes is being utilized to submit batch 
jobs every 1minute. Run time for a batch job is <30 seconds.
  2.  This Flink session cluster is running in HA setup. This means it stores 
job graph and its relevant metadata in flink " /recovery/default/blob/," folder 
for each job that is submitted.
  3.  There is "5 Gb" pvc attached to this session cluster for HA and is based 
out of cephfs. https://docs.ceph.com/en/pacific/cephfs/index.html . Rook is 
used for orchestration.

Ideal Working Scenario:

  1.  Upon a successful job submission, the metadata is created and cleared 
after completion. Average size of blob getting created under recovery folder 
for ha is 150 mb. (Enough space in pvc)

Failure:

  1.  During a long run say for 100 minutes so 100 job submissions , a flink 
job submission will fail stating :


2021-11-22 09:03:11,537 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Finished 
cleaning up the high availability data for job 6a71a36a3c82d8a9438c9aa9ed6b8993.
2021-11-22 09:03:14,904 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection           [] - PUT operation 
failed
java.io.IOException: Cannot allocate memory
    at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_312]
    at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_312]
    at 
org.apache.flink.core.fs.local.LocalDataOutputStream.write(LocalDataOutputStream.java:55)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at 
org.apache.flink.shaded.guava30.com.google.common.io.ByteStreams.copy(ByteStreams.java:113)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at 
org.apache.flink.shaded.guava30.com.google.common.io.ByteSource.copyTo(ByteSource.java:243)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at 
org.apache.flink.shaded.guava30.com.google.common.io.Files.copy(Files.java:301) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:79)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:680)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350)
 [flink-dist_2.11-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110)
 [flink-dist_2.11-1.14.0.jar:1.14.0]

NOTE that still pvc has not become full. Also the next job submission would 
succeed without complaining about the storage being full. But due to random 
failures eventually the old blobs will pile up in recovery folder making pvc 
full after which all jobs submissions will fail.

Request immediate help here folks. Any pointers to why this behavior. We are 
relying on HA to make sure each job runs fine. This is mission critical data.
Secondly what we would want to see - why stale data is not being cleared off ? 
is it a configuration that we have not done. Still this does not solve our data 
loss problem we experience due to intermittent job submission failures but will 
make sure our pvc is not unnecessary saturated beyond which all jobs fail.

Thankyou in advance.

-Jay
GEHC





Reply via email to