[ https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424006#comment-16424006 ]
Alon Galant commented on FLINK-8707: ------------------------------------ Hey, I have a similar problem, this is my thread on Flink ML: [https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E] I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic into Flink, so I need to have around the same number of files that the data is being written to at once. I attached 3 files: lsof.txt - the result for $lsof > lsof.txt lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink proccess) ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt /tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it works on, before uploading them to s3. I ran these commands on my task manager (I run 2 tasks manager and a total of 8 task slots, 4 on each tm) Here are some lsof | wc -l results: {code:java} // code placeholder less lsof.txt | wc -l --> 44228 less lsofp.txt | wc -l --> 403 less ll.txt | wc -l --> 64 less lsof.txt | grep output-1620244093829221334.tmp | wc -l --> 108 (this is an example of an output file, I think all of them gives the same result) {code} >From ll.txt we can see that there are 4 files for each customerId (for each >partition), so I guess that every task slot opens its own file. For each 'output' file there are 108 FDs. My problem is that I want to be able to handle around 500 customers, and I want to still be able to use high concurrency. When I tried to use 32 tasks slots on 2 TM, I got more than a million FDs on lsof This is the exception I got when I've enabled all of the Ids: {code:java} // code placeholder java.lang.RuntimeException: Error parsing YAML configuration. at org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:178) at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:112) at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:79) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopConfiguration(HadoopFileSystem.java:178) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:408) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:351) at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:232) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: /srv/flink-1.2.1/conf/flink-conf.yaml (Too many open files) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.<init>(FileInputStream.java:146) at org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:144) ... 14 more {code} I increased the ulimit to 500,000 but it's still not enough, and I guess this is too much anyhow. I'd love to get some help! Thanks, Alon [^ll.txt] [^lsof.txt] [^lsofp.txt] > Excessive amount of files opened by flink task manager > ------------------------------------------------------ > > Key: FLINK-8707 > URL: https://issues.apache.org/jira/browse/FLINK-8707 > Project: Flink > Issue Type: Bug > Components: TaskManager > Affects Versions: 1.3.2 > Environment: NAME="Red Hat Enterprise Linux Server" > VERSION="7.3 (Maipo)" > Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA. > flink.yaml below with some settings (removed exact box names) etc: > env.log.dir: ...some dir...residing on the same box > env.pid.dir: some dir...residing on the same box > metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter > metrics.reporters: jmx > state.backend: filesystem > state.backend.fs.checkpointdir: file:///some_nfs_mount > state.checkpoints.dir: file:///some_nfs_mount > state.checkpoints.num-retained: 3 > high-availability.cluster-id: /tst > high-availability.storageDir: file:///some_nfs_mount/ha > high-availability: zookeeper > high-availability.zookeeper.path.root: /flink > high-availability.zookeeper.quorum: ...list of zookeeper boxes > env.java.opts.jobmanager: ...some extra jar args > jobmanager.archive.fs.dir: some dir...residing on the same box > jobmanager.web.submit.enable: true > jobmanager.web.tmpdir: some dir...residing on the same box > env.java.opts.taskmanager: some extra jar args > taskmanager.tmp.dirs: some dir...residing on the same box/var/tmp > taskmanager.network.memory.min: 1024MB > taskmanager.network.memory.max: 2048MB > blob.storage.directory: some dir...residing on the same box > Reporter: Alexander Gardner > Assignee: Piotr Nowojski > Priority: Critical > Fix For: 1.5.0 > > Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, > AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, > AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, > box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, > ll.txt, ll.txt, lsof.txt, lsof.txt, lsofp.txt, lsofp.txt > > > The job manager has less FDs than the task manager. > > Hi > A support alert indicated that there were a lot of open files for the boxes > running Flink. > There were 4 flink jobs that were dormant but had consumed a number of msgs > from Kafka using the FlinkKafkaConsumer010. > A simple general lsof: > $ lsof | wc -l -> returned 153114 open file descriptors. > Focusing on the TaskManager process (process ID = 12154): > $ lsof | grep 12154 | wc -l- > returned 129322 open FDs > $ lsof -p 12154 | wc -l -> returned 531 FDs > There were 228 threads running for the task manager. > > Drilling down a bit further, looking at a_inode and FIFO entries: > $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs > $ lsof -p 12154 | grep FIFO | wc -l = 200 FDs > $ /proc/12154/maps = 920 entries. > Apart from lsof identifying lots of JARs and SOs being referenced there were > also 244 child processes for the task manager process. > Noticed that in each environment, a creep of file descriptors...are the above > figures deemed excessive for the no of FDs in use? I know Flink uses Netty - > is it using a separate Selector for reads & writes? > Additionally Flink uses memory mapped files? or direct bytebuffers are these > skewing the numbers of FDs shown? > Example of one child process ID 6633: > java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll] > java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe > java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe > Lasty, cannot identify yet the reason for the creep in FDs even if Flink is > pretty dormant or has dormant jobs. Production nodes are not experiencing > excessive amounts of throughput yet either. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)