Re: Too many open files issue
I don't think that is the issue. I have it setup to run in a thread pool but I have set the pool size to 1 for this test until I get this resolved. I am having some problems with using the Spark web portal since it is picking a random port and with the way my environment is setup, by time I have figured out which port it is using the job has finished. But what I did do is add some logging and I added collecting the RDD record count to make sure the last logging statements were in fact executed after the RDD process ran. I added the logging statements in the job flow: val directories = findModifiedFiles() directories.foreach(directory => { log 'Starting directory processor for $directory' rdd = sparkContext.newAPIHadoopFile(directory) .filter(...) .map(...) .reduceByKey(...) rdd.foreachPartition(iterator => { iterator.foreach(tuple => { // send data to kafka } } val count = rdd.count log 'Processed $count records for $directory' log 'Finished directory processor for $directory' } This results in these log lines until the "Too many open files in system" errors started happening after which it only printed the first log line for each iteration (as expected since it's throwing an exception). Starting directory processor for /user/oozie/topics/testtopic/hourly/2015/04/15 Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/15 Finished directory processor for /user/oozie/topics/testtopic/hourly/2015/04/15 Starting directory processor for /user/oozie/topics/testtopic/hourly/2015/04/16 Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/16 Finished directory processor for /user/oozie/topics/testtopic/hourly/2015/04/16 Starting directory processor for /user/oozie/topics/testtopic/hourly/2015/04/17 Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/17 Finished directory processor for /user/oozie/topics/testtopic/hourly/2015/04/17 Starting directory processor for /user/oozie/topics/testtopic/hourly/2015/04/18 Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/18 Finished directory processor for /user/oozie/topics/testtopic/hourly/2015/04/18 Starting directory processor for /user/oozie/topics/testtopic/hourly/2015/04/19 Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/19 Finished directory processor for /user/oozie/topics/testtopic/hourly/2015/04/19 Starting directory processor for /user/oozie/topics/testtopic/hourly/2015/04/20 Just to see what would happen if I made the job run slower and had GC run more frequently (in case it had something to do with that), I added the following to each loop iteration: System.gc() Thread.sleep(5000) But besides making the job run a lot longer it did not change anything. Sigurd On Wed, Sep 2, 2015 at 9:40 AM, Saisai Shao wrote: > Here is the code in which NewHadoopRDD register close handler and be > called when the task is completed ( > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136 > ). > > From my understanding, possibly the reason is that this `foreach` code in > your implementation is not executed Spark job one by one in loop as > expected, on the contrary all the jobs are submitted to the DAGScheduler > simultaneously, since each job has no dependency to others, Spark's > scheduler will unwrap the loop and submit jobs in parallelism, so maybe > several map stages are running and pending, this makes your node out of > file handler. > > You could check Spark web portal to see if there's several map stages > running simultaneously, or some of them are running while others are > pending. > > Thanks > Jerry > > > On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg > wrote: > >> Yep. I know. It's was set to 32K when I ran this test. If I bump it to >> 64K the issue goes away. It still doesn't make sense to me that the Spark >> job doesn't release its file handles until the end of the job instead of >> doing that while my loop iterates. >> >> Sigurd >> >> On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran >> wrote: >> >>> >>> On 31 Aug 2015, at 19:49, Sigurd Knippenberg >>> wrote: >>> >>> I know I can adjust the max open files allowed by the OS but I'd rather >>> fix the underlaying issue. >>> >>> >>> >>> bumping up the OS handle limits is step #1 of installing a hadoop cluster >>> >>> https://wiki.apache.org/hadoop/TooManyOpenFiles >>> >> >> >
Re: Too many open files issue
Here is the code in which NewHadoopRDD register close handler and be called when the task is completed ( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136 ). >From my understanding, possibly the reason is that this `foreach` code in your implementation is not executed Spark job one by one in loop as expected, on the contrary all the jobs are submitted to the DAGScheduler simultaneously, since each job has no dependency to others, Spark's scheduler will unwrap the loop and submit jobs in parallelism, so maybe several map stages are running and pending, this makes your node out of file handler. You could check Spark web portal to see if there's several map stages running simultaneously, or some of them are running while others are pending. Thanks Jerry On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg wrote: > Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K > the issue goes away. It still doesn't make sense to me that the Spark job > doesn't release its file handles until the end of the job instead of doing > that while my loop iterates. > > Sigurd > > On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran > wrote: > >> >> On 31 Aug 2015, at 19:49, Sigurd Knippenberg >> wrote: >> >> I know I can adjust the max open files allowed by the OS but I'd rather >> fix the underlaying issue. >> >> >> >> bumping up the OS handle limits is step #1 of installing a hadoop cluster >> >> https://wiki.apache.org/hadoop/TooManyOpenFiles >> > >
Re: Too many open files issue
ah, now that does sound suspicious... On 2 Sep 2015, at 14:09, Sigurd Knippenberg mailto:sig...@knippenberg.com>> wrote: Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K the issue goes away. It still doesn't make sense to me that the Spark job doesn't release its file handles until the end of the job instead of doing that while my loop iterates. Sigurd On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran mailto:ste...@hortonworks.com>> wrote: On 31 Aug 2015, at 19:49, Sigurd Knippenberg mailto:sig...@knippenberg.com>> wrote: I know I can adjust the max open files allowed by the OS but I'd rather fix the underlaying issue. bumping up the OS handle limits is step #1 of installing a hadoop cluster https://wiki.apache.org/hadoop/TooManyOpenFiles
Re: Too many open files issue
Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K the issue goes away. It still doesn't make sense to me that the Spark job doesn't release its file handles until the end of the job instead of doing that while my loop iterates. Sigurd On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran wrote: > > On 31 Aug 2015, at 19:49, Sigurd Knippenberg > wrote: > > I know I can adjust the max open files allowed by the OS but I'd rather > fix the underlaying issue. > > > > bumping up the OS handle limits is step #1 of installing a hadoop cluster > > https://wiki.apache.org/hadoop/TooManyOpenFiles >
Re: Too many open files issue
On 31 Aug 2015, at 19:49, Sigurd Knippenberg mailto:sig...@knippenberg.com>> wrote: I know I can adjust the max open files allowed by the OS but I'd rather fix the underlaying issue. bumping up the OS handle limits is step #1 of installing a hadoop cluster https://wiki.apache.org/hadoop/TooManyOpenFiles
Too many open files issue
I am running in a 'too many open files' issue and before I posted this I have searched on the web to see if anyone had a solution already to my particular problem but I did not see anything that helped. I know I can adjust the max open files allowed by the OS but I'd rather fix the underlaying issue. In HDFS I have a directory that contains hourly files in a directory structure like this: directoryname/hourly/2015/06/02/13 (where the numbers at the end are the date in /MM/DD/HH format). I have a Spark job that roughly does the following: val directories = findModifiedFiles() directories.foreach(directory => { sparkContext.newAPIHadoopFile(directory) .filter(...) .map(...) .reduceByKey(...) .foreachPartition(iterator => { iterator.foreach(tuple => { // send data to kafka } } } If there are only a few directories that have been modified then this works pretty well. But when I have the job reprocess all the data (I have 350M of test data that pretty much has data for each hour of each day for a full year) I run out of file handles. I executed the test on a test cluster of 2 hadoop slave nodes that each have the HDFS data node and yarn node manager running. When I run "lsof -p" on the Spark processes, I see a lot of the following types of open files: java21196 yarn 3268r REG 8,16 139 533320 /hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir17/blk_1073746324_5500.meta java21196 yarn 3269r REG 8,16 15004 533515 /hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir17/blk_1073746422 java21196 yarn 3270r REG 8,16 127 533516 /hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir17/blk_1073746422_5598.meta java21196 yarn 3271r REG 8,16 15583 534081 /hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir19/blk_1073746704 java21196 yarn 3272r REG 8,16 131 534082 /hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir19/blk_1073746704_5880.meta When I watch the process run, I can see that the number of those open files is ever increasing while it is processing directories until it runs out of file handles (it then drops to zero and starts up again until it runs out again but that is due to the fact that Yarn retries running the job). It basically ends up opening about 4500 file handles to those files per node. As I said, I know that I can increase the number of open file handles, and I will do that, but in my opinion it should not be ever increasing. I would have thought that when I was done with an RDD that Spark would close all the resources that it opened for them (so that it would close the file handles after each execution of the directories.foreach loop). I looked if there was a close() method or something like that for the RDD but couldn't find that. Am I doing something that is causing Spark not to close the file handles? Should I write this job differently? Thanks, Sigurd