I don't think that is the issue. I have it setup to run in a thread pool
but I have set the pool size to 1 for this test until I get this resolved.
I am having some problems with using the Spark web portal since it is
picking a random port and with the way my environment is setup, by time I
have figured out which port it is using the job has finished. But what I
did do is add some logging and I added collecting the RDD record count to
make sure the last logging statements were in fact executed after the RDD
process ran. I added the logging statements in the job flow:

val directories = findModifiedFiles()
directories.foreach(directory => {
    log 'Starting directory processor for $directory'
    rdd = sparkContext.newAPIHadoopFile(directory)

    rdd.foreachPartition(iterator => {
            iterator.foreach(tuple => {
                // send data to kafka

    val count = rdd.count
    log 'Processed $count records for $directory'
    log 'Finished directory processor for $directory'

This results in these log lines until the "Too many open files in system"
errors started happening after which it only printed the first log line for
each iteration (as expected since it's throwing an exception).

Starting directory processor for
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/15
Finished directory processor for
Starting directory processor for
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/16
Finished directory processor for
Starting directory processor for
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/17
Finished directory processor for
Starting directory processor for
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/18
Finished directory processor for
Starting directory processor for
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/19
Finished directory processor for
Starting directory processor for

Just to see what would happen if I made the job run slower and had GC run
more frequently (in case it had something to do with that), I added the
following to each loop iteration:

But besides making the job run a lot longer it did not change anything.


On Wed, Sep 2, 2015 at 9:40 AM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> Here is the code in which NewHadoopRDD register close handler and be
> called when the task is completed (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136
> ).
> From my understanding, possibly the reason is that this `foreach` code in
> your implementation is not executed Spark job one by one in loop as
> expected, on the contrary all the jobs are submitted to the DAGScheduler
> simultaneously, since each job has no dependency to others, Spark's
> scheduler will unwrap the loop and submit jobs in parallelism, so maybe
> several map stages are running and pending, this makes your node out of
> file handler.
> You could check Spark web portal to see if there's several map stages
> running simultaneously, or some of them are running while others are
> pending.
> Thanks
> Jerry
> On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg <sig...@knippenberg.com
> > wrote:
>> Yep. I know. It's was set to 32K when I ran this test. If I bump it to
>> 64K the issue goes away. It still doesn't make sense to me that the Spark
>> job doesn't release its file handles until the end of the job instead of
>> doing that while my loop iterates.
>> Sigurd
>> On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran <ste...@hortonworks.com>
>> wrote:
>>> On 31 Aug 2015, at 19:49, Sigurd Knippenberg <sig...@knippenberg.com>
>>> wrote:
>>> I know I can adjust the max open files allowed by the OS but I'd rather
>>> fix the underlaying issue.
>>> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>>> https://wiki.apache.org/hadoop/TooManyOpenFiles

Reply via email to