Hi Flink Team, Greetings from GE Healthcare team.
Here is a stackoverflow post for the same too posted by fellow dev here : https://stackoverflow.com/questions/70068336/flink-job-not-getting-submitted-java-io-ioexception-cannot-allocate-memory Summary of the post: Here is the usecase and relevant configuration: 1. A flink session cluster in Kubernetes is being utilized to submit batch jobs every 1minute. Run time for a batch job is <30 seconds. 2. This Flink session cluster is running in HA setup. This means it stores job graph and its relevant metadata in flink " /recovery/default/blob/," folder for each job that is submitted. 3. There is "5 Gb" pvc attached to this session cluster for HA and is based out of cephfs. https://docs.ceph.com/en/pacific/cephfs/index.html . Rook is used for orchestration. Ideal Working Scenario: 1. Upon a successful job submission, the metadata is created and cleared after completion. Average size of blob getting created under recovery folder for ha is 150 mb. (Enough space in pvc) Failure: 1. During a long run say for 100 minutes so 100 job submissions , a flink job submission will fail stating : 2021-11-22 09:03:11,537 INFO org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Finished cleaning up the high availability data for job 6a71a36a3c82d8a9438c9aa9ed6b8993. 2021-11-22 09:03:14,904 ERROR org.apache.flink.runtime.blob.BlobServerConnection [] - PUT operation failed java.io.IOException: Cannot allocate memory at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_312] at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_312] at org.apache.flink.core.fs.local.LocalDataOutputStream.write(LocalDataOutputStream.java:55) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.shaded.guava30.com.google.common.io.ByteStreams.copy(ByteStreams.java:113) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.shaded.guava30.com.google.common.io.ByteSource.copyTo(ByteSource.java:243) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.shaded.guava30.com.google.common.io.Files.copy(Files.java:301) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:79) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:680) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350) [flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110) [flink-dist_2.11-1.14.0.jar:1.14.0] NOTE that still pvc has not become full. Also the next job submission would succeed without complaining about the storage being full. But due to random failures eventually the old blobs will pile up in recovery folder making pvc full after which all jobs submissions will fail. Request immediate help here folks. Any pointers to why this behavior. We are relying on HA to make sure each job runs fine. This is mission critical data. Secondly what we would want to see - why stale data is not being cleared off ? is it a configuration that we have not done. Still this does not solve our data loss problem we experience due to intermittent job submission failures but will make sure our pvc is not unnecessary saturated beyond which all jobs fail. Thankyou in advance. -Jay GEHC