[ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424006#comment-16424006
 ] 

Alon Galant commented on FLINK-8707:
------------------------------------

Hey, 

I have a similar problem, this is my thread on Flink ML:

[https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E]

 I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic 
into Flink, so I need to have around the same number of files that the data is 
being written to at once.

 

I attached 3 files:

lsof.txt - the result for $lsof > lsof.txt

lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink 
proccess)

ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt

/tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it 
works on, before uploading them to s3. 

I ran these commands on my task manager (I run 2 tasks manager and a total of 8 
task slots, 4 on each tm)

 

Here are some lsof | wc -l results:

 
{code:java}
// code placeholder
less lsof.txt | wc -l --> 44228
less lsofp.txt | wc -l --> 403
less ll.txt | wc -l --> 64
less lsof.txt | grep output-1620244093829221334.tmp | wc -l --> 108 (this is an 
example of an output file, I think all of them gives the same result)


{code}
 

 

>From ll.txt we can see that there are 4 files for each customerId (for each 
>partition), so I guess that every task slot opens its own file.

For each 'output' file there are 108 FDs.

My problem is that I want to be able to handle around 500 customers, and I want 
to still be able to use high concurrency. 

When I tried to use 32 tasks slots on 2 TM, I got more than a million FDs on 
lsof

This is the exception I got when I've enabled all of the Ids:

 
{code:java}
// code placeholder
java.lang.RuntimeException: Error parsing YAML configuration.
        at 
org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:178)
        at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:112)
        at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:79)
        at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopConfiguration(HadoopFileSystem.java:178)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:408)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:351)
        at 
org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at 
org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:232)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: /srv/flink-1.2.1/conf/flink-conf.yaml 
(Too many open files)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.<init>(FileInputStream.java:146)
        at 
org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:144)
        ... 14 more
{code}
I increased the ulimit to 500,000 but it's still not enough, and I guess this 
is too much anyhow.

 

I'd love to get some help!

Thanks,

Alon

 

[^ll.txt]

[^lsof.txt]

[^lsofp.txt]

> Excessive amount of files opened by flink task manager
> ------------------------------------------------------
>
>                 Key: FLINK-8707
>                 URL: https://issues.apache.org/jira/browse/FLINK-8707
>             Project: Flink
>          Issue Type: Bug
>          Components: TaskManager
>    Affects Versions: 1.3.2
>         Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>            Reporter: Alexander Gardner
>            Assignee: Piotr Nowojski
>            Priority: Critical
>             Fix For: 1.5.0
>
>         Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, 
> AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, 
> AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, 
> box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof, 
> ll.txt, ll.txt, lsof.txt, lsof.txt, lsofp.txt, lsofp.txt
>
>
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
>  java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is 
> pretty dormant or has dormant jobs. Production nodes are not experiencing 
> excessive amounts of throughput yet either.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to