Re: Too many open files, why changing ulimit not effecting?

2016-02-10 Thread Michael Diamant
If you are using systemd, you will need to specify the limit in the service
file.  I had run into this problem and discovered the solution from the
following references:
* https://bugzilla.redhat.com/show_bug.cgi?id=754285#c1
* http://serverfault.com/a/678861

On Fri, Feb 5, 2016 at 1:18 PM, Nirav Patel  wrote:

> For centos there's also /etc/security/limits.d/90-nproc.conf  that may
> need modifications.
>
> Services that you expect to use new limits needs to be restarted. Simple
> thing to do is to reboot the machine.
>
> On Fri, Feb 5, 2016 at 3:59 AM, Ted Yu  wrote:
>
>> bq. and *"session required pam_limits.so"*.
>>
>> What was the second file you modified ?
>>
>> Did you make the change on all the nodes ?
>>
>> Please see the verification step in
>> https://easyengine.io/tutorials/linux/increase-open-files-limit/
>>
>> On Fri, Feb 5, 2016 at 1:42 AM, Mohamed Nadjib MAMI > > wrote:
>>
>>> Hello all,
>>>
>>> I'm getting the famous *java.io.FileNotFoundException: ... (Too many
>>> open files) *exception. What seemed to have helped people out, it
>>> haven't for me. I tried to set the ulimit via the command line *"ulimit
>>> -n"*, then I tried to add the following lines to
>>> *"/etc/security/limits.conf"* file:
>>>
>>> ** - nofile 100*
>>> *root soft nofile 100*
>>> *root hard nofile 100*
>>> *hduser soft nofile 100*
>>> *hduser hard nofile 100*
>>>
>>> ...then I added this line *"session required pam_limits.so"* to the two
>>> files* "/etc/pam.d/common-session"* and *"session required
>>> pam_limits.so"*. The I logged-out/logged-in. First, I tried only the
>>> first line (** - nofile 100**)*, then added the 2nd and the 3rd
>>> (root...),  then added the last two lines (hduser...), no effect. Weirdly
>>> enough, when I check with the command *"ulimit -n"* it returns the
>>> correct value of 100.
>>>
>>> I then added *"ulimit -n 100"* to *"spark-env.sh"* in the master
>>> and in each of my workers, no effect.
>>>
>>> What else could it be besides changing the ulimit setting? if it's only
>>> that, what could cause Spark to ignore it?
>>>
>>> I'll appreciate any help in advance.
>>>
>>> --
>>> *PhD Student - EIS Group - Bonn University, Germany.*
>>> *+49 1575 8482232 <%2B49%201575%208482232>*
>>>
>>>
>>
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


Re: Too many open files, why changing ulimit not effecting?

2016-02-05 Thread Nirav Patel
For centos there's also /etc/security/limits.d/90-nproc.conf  that may need
modifications.

Services that you expect to use new limits needs to be restarted. Simple
thing to do is to reboot the machine.

On Fri, Feb 5, 2016 at 3:59 AM, Ted Yu  wrote:

> bq. and *"session required pam_limits.so"*.
>
> What was the second file you modified ?
>
> Did you make the change on all the nodes ?
>
> Please see the verification step in
> https://easyengine.io/tutorials/linux/increase-open-files-limit/
>
> On Fri, Feb 5, 2016 at 1:42 AM, Mohamed Nadjib MAMI 
> wrote:
>
>> Hello all,
>>
>> I'm getting the famous *java.io.FileNotFoundException: ... (Too many
>> open files) *exception. What seemed to have helped people out, it
>> haven't for me. I tried to set the ulimit via the command line *"ulimit
>> -n"*, then I tried to add the following lines to
>> *"/etc/security/limits.conf"* file:
>>
>> ** - nofile 100*
>> *root soft nofile 100*
>> *root hard nofile 100*
>> *hduser soft nofile 100*
>> *hduser hard nofile 100*
>>
>> ...then I added this line *"session required pam_limits.so"* to the two
>> files* "/etc/pam.d/common-session"* and *"session required
>> pam_limits.so"*. The I logged-out/logged-in. First, I tried only the
>> first line (** - nofile 100**)*, then added the 2nd and the 3rd
>> (root...),  then added the last two lines (hduser...), no effect. Weirdly
>> enough, when I check with the command *"ulimit -n"* it returns the
>> correct value of 100.
>>
>> I then added *"ulimit -n 100"* to *"spark-env.sh"* in the master and
>> in each of my workers, no effect.
>>
>> What else could it be besides changing the ulimit setting? if it's only
>> that, what could cause Spark to ignore it?
>>
>> I'll appreciate any help in advance.
>>
>> --
>> *PhD Student - EIS Group - Bonn University, Germany.*
>> *+49 1575 8482232 <%2B49%201575%208482232>*
>>
>>
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Too many open files, why changing ulimit not effecting?

2016-02-05 Thread Ted Yu
bq. and *"session required pam_limits.so"*.

What was the second file you modified ?

Did you make the change on all the nodes ?

Please see the verification step in
https://easyengine.io/tutorials/linux/increase-open-files-limit/

On Fri, Feb 5, 2016 at 1:42 AM, Mohamed Nadjib MAMI 
wrote:

> Hello all,
>
> I'm getting the famous *java.io.FileNotFoundException: ... (Too many open
> files) *exception. What seemed to have helped people out, it haven't for
> me. I tried to set the ulimit via the command line *"ulimit -n"*, then I
> tried to add the following lines to *"/etc/security/limits.conf"* file:
>
> ** - nofile 100*
> *root soft nofile 100*
> *root hard nofile 100*
> *hduser soft nofile 100*
> *hduser hard nofile 100*
>
> ...then I added this line *"session required pam_limits.so"* to the two
> files* "/etc/pam.d/common-session"* and *"session required pam_limits.so"*.
> The I logged-out/logged-in. First, I tried only the first line (** -
> nofile 100**)*, then added the 2nd and the 3rd (root...),  then added
> the last two lines (hduser...), no effect. Weirdly enough, when I check
> with the command *"ulimit -n"* it returns the correct value of 100.
>
> I then added *"ulimit -n 100"* to *"spark-env.sh"* in the master and
> in each of my workers, no effect.
>
> What else could it be besides changing the ulimit setting? if it's only
> that, what could cause Spark to ignore it?
>
> I'll appreciate any help in advance.
>
> --
> *PhD Student - EIS Group - Bonn University, Germany.*
> *+49 1575 8482232 <%2B49%201575%208482232>*
>
>


Re: "Too many open files" exception on reduceByKey

2015-10-11 Thread Tian Zhang
It turns out the mesos can overwrite the OS ulimit -n setting. So we have
increased the mesos slave ulimit -n setting.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p25019.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: "Too many open files" exception on reduceByKey

2015-10-09 Thread tian zhang
You are right, I did find that mesos overwrite this to a smaller number.So we 
will modify that and try to run again. Thanks!
Tian 


 On Thursday, October 8, 2015 4:18 PM, DB Tsai  wrote:
   

 Try to run to see actual ulimit. We found that mesos overrides the ulimit 
which causes the issue.
import sys.process._
val p = 1 to 100
val rdd = sc.parallelize(p, 100)
val a = rdd.map(x=> Seq("sh", "-c", "ulimit -n").!!.toDouble.toLong).collect



Sincerely,

DB Tsai
--Blog: 
https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D

On Thu, Oct 8, 2015 at 3:22 PM, Tian Zhang  wrote:

I hit this issue with spark 1.3.0 stateful application (with
updateStateByKey) function on mesos.  It will
fail after running fine for about 24 hours.
The error stack trace as below, I checked ulimit -n and we have very large
numbers set on the machines.
What else can be wrong?
15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
113727.0 (TID 833758, ip-10-112-10-221.ec2.internal):
java.io.FileNotFoundException:
/media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index
(Too many open files)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.(FileOutputStream.java:221)
        at java.io.FileOutputStream.(FileOutputStream.java:171)
        at
org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85)
        at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





  

Re: "Too many open files" exception on reduceByKey

2015-10-08 Thread Tian Zhang
I hit this issue with spark 1.3.0 stateful application (with
updateStateByKey) function on mesos.  It will 
fail after running fine for about 24 hours.
The error stack trace as below, I checked ulimit -n and we have very large
numbers set on the machines.
What else can be wrong?
15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
113727.0 (TID 833758, ip-10-112-10-221.ec2.internal):
java.io.FileNotFoundException:
/media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index
(Too many open files)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at java.io.FileOutputStream.(FileOutputStream.java:171)
at
org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: "Too many open files" exception on reduceByKey

2015-10-08 Thread DB Tsai
Try to run to see actual ulimit. We found that mesos overrides the ulimit
which causes the issue.

import sys.process._
val p = 1 to 100
val rdd = sc.parallelize(p, 100)
val a = rdd.map(x=> Seq("sh", "-c", "ulimit -n").!!.toDouble.toLong).collect




Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D


On Thu, Oct 8, 2015 at 3:22 PM, Tian Zhang  wrote:

> I hit this issue with spark 1.3.0 stateful application (with
> updateStateByKey) function on mesos.  It will
> fail after running fine for about 24 hours.
> The error stack trace as below, I checked ulimit -n and we have very large
> numbers set on the machines.
> What else can be wrong?
> 15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
> 113727.0 (TID 833758, ip-10-112-10-221.ec2.internal):
> java.io.FileNotFoundException:
>
> /media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index
> (Too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)
> at java.io.FileOutputStream.(FileOutputStream.java:171)
> at
>
> org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85)
> at
>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Too many open files issue

2015-09-03 Thread Sigurd Knippenberg
I don't think that is the issue. I have it setup to run in a thread pool
but I have set the pool size to 1 for this test until I get this resolved.
I am having some problems with using the Spark web portal since it is
picking a random port and with the way my environment is setup, by time I
have figured out which port it is using the job has finished. But what I
did do is add some logging and I added collecting the RDD record count to
make sure the last logging statements were in fact executed after the RDD
process ran. I added the logging statements in the job flow:

val directories = findModifiedFiles()
directories.foreach(directory => {
log 'Starting directory processor for $directory'
rdd = sparkContext.newAPIHadoopFile(directory)
.filter(...)
.map(...)
.reduceByKey(...)

rdd.foreachPartition(iterator => {
iterator.foreach(tuple => {
// send data to kafka
}
}

val count = rdd.count
log 'Processed $count records for $directory'
log 'Finished directory processor for $directory'
}

This results in these log lines until the "Too many open files in system"
errors started happening after which it only printed the first log line for
each iteration (as expected since it's throwing an exception).

Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/15
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/15
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/15
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/16
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/16
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/16
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/17
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/17
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/17
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/18
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/18
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/18
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/19
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/19
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/19
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/20

Just to see what would happen if I made the job run slower and had GC run
more frequently (in case it had something to do with that), I added the
following to each loop iteration:
  System.gc()
  Thread.sleep(5000)

But besides making the job run a lot longer it did not change anything.

Sigurd


On Wed, Sep 2, 2015 at 9:40 AM, Saisai Shao  wrote:

> Here is the code in which NewHadoopRDD register close handler and be
> called when the task is completed (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136
> ).
>
> From my understanding, possibly the reason is that this `foreach` code in
> your implementation is not executed Spark job one by one in loop as
> expected, on the contrary all the jobs are submitted to the DAGScheduler
> simultaneously, since each job has no dependency to others, Spark's
> scheduler will unwrap the loop and submit jobs in parallelism, so maybe
> several map stages are running and pending, this makes your node out of
> file handler.
>
> You could check Spark web portal to see if there's several map stages
> running simultaneously, or some of them are running while others are
> pending.
>
> Thanks
> Jerry
>
>
> On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg  > wrote:
>
>> Yep. I know. It's was set to 32K when I ran this test. If I bump it to
>> 64K the issue goes away. It still doesn't make sense to me that the Spark
>> job doesn't release its file handles until the end of the job instead of
>> doing that while my loop iterates.
>>
>> Sigurd
>>
>> On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran 
>> wrote:
>>
>>>
>>> On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
>>> wrote:
>>>
>>> I know I can adjust the max open files allowed by the OS but I'd rather
>>> fix the underlaying issue.
>>>
>>>
>>>
>>> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>>>
>>> https://wiki.apache.org/hadoop/TooManyOpenFiles
>>>
>>
>>
>


Re: Too many open files issue

2015-09-02 Thread Steve Loughran

On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
> wrote:

I know I can adjust the max open files allowed by the OS but I'd rather fix the 
underlaying issue.


bumping up the OS handle limits is step #1 of installing a hadoop cluster

https://wiki.apache.org/hadoop/TooManyOpenFiles


Re: Too many open files issue

2015-09-02 Thread Saisai Shao
Here is the code in which NewHadoopRDD register close handler and be called
when the task is completed (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136
).

>From my understanding, possibly the reason is that this `foreach` code in
your implementation is not executed Spark job one by one in loop as
expected, on the contrary all the jobs are submitted to the DAGScheduler
simultaneously, since each job has no dependency to others, Spark's
scheduler will unwrap the loop and submit jobs in parallelism, so maybe
several map stages are running and pending, this makes your node out of
file handler.

You could check Spark web portal to see if there's several map stages
running simultaneously, or some of them are running while others are
pending.

Thanks
Jerry


On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg 
wrote:

> Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K
> the issue goes away. It still doesn't make sense to me that the Spark job
> doesn't release its file handles until the end of the job instead of doing
> that while my loop iterates.
>
> Sigurd
>
> On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran 
> wrote:
>
>>
>> On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
>> wrote:
>>
>> I know I can adjust the max open files allowed by the OS but I'd rather
>> fix the underlaying issue.
>>
>>
>>
>> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>>
>> https://wiki.apache.org/hadoop/TooManyOpenFiles
>>
>
>


Re: Too many open files issue

2015-09-02 Thread Steve Loughran
ah, now that does sound suspicious...

On 2 Sep 2015, at 14:09, Sigurd Knippenberg 
> wrote:

Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K the 
issue goes away. It still doesn't make sense to me that the Spark job doesn't 
release its file handles until the end of the job instead of doing that while 
my loop iterates.

Sigurd

On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran 
> wrote:

On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
> wrote:

I know I can adjust the max open files allowed by the OS but I'd rather fix the 
underlaying issue.


bumping up the OS handle limits is step #1 of installing a hadoop cluster

https://wiki.apache.org/hadoop/TooManyOpenFiles




Re: Too many open files issue

2015-09-02 Thread Sigurd Knippenberg
Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K
the issue goes away. It still doesn't make sense to me that the Spark job
doesn't release its file handles until the end of the job instead of doing
that while my loop iterates.

Sigurd

On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran 
wrote:

>
> On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
> wrote:
>
> I know I can adjust the max open files allowed by the OS but I'd rather
> fix the underlaying issue.
>
>
>
> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>
> https://wiki.apache.org/hadoop/TooManyOpenFiles
>


Re: Too many open files

2015-07-29 Thread Igor Berman
you probably should increase file handles limit for user that all processes
are running with(spark master  workers)
e.g.
http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/

On 29 July 2015 at 18:39, saif.a.ell...@wellsfargo.com wrote:

  Hello,

 I’ve seen a couple emails on this issue but could not find anything to
 solve my situation.

 Tried to reduce the partitioning level, enable consolidateFiles and
 increase the sizeInFlight limit, but still no help. Spill manager is sort,
 which is the default, any advice?

 15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID
 331, localhost): FetchFailed(BlockManagerId(driver, localhost, 43437),
 shuffleId=3, mapId=0, reduceId=34, message=
 org.apache.spark.shuffle.FetchFailedException:
 /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index
 (Too many open files)
 ..
 ..
 15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in
 stage 11.0 (TID 306)
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 20
 in stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage
 11.0 (TID 317, localhost): java.io.FileNotFoundException:
 /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc
 (Too many open files)

 my fs is ext4 and currently ulist –n is 1024

 Thanks
 Saif




RE: Too many open files

2015-07-29 Thread Saif.A.Ellafi
Thank you both, I will take a look, but


1.   For high-shuffle tasks, is this right for the system to have the size 
and thresholds high? I hope there is no bad consequences.

2.   I will try to overlook admin access and see if I can get anything with 
only user rights

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Wednesday, July 29, 2015 12:59 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.org
Subject: Re: Too many open files

Please increase limit for open files:

http://stackoverflow.com/questions/34588/how-do-i-change-the-number-of-open-files-limit-in-linux


On Jul 29, 2015, at 8:39 AM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hello,

I’ve seen a couple emails on this issue but could not find anything to solve my 
situation.

Tried to reduce the partitioning level, enable consolidateFiles and increase 
the sizeInFlight limit, but still no help. Spill manager is sort, which is the 
default, any advice?

15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID 331, 
localhost): FetchFailed(BlockManagerId(driver, localhost, 43437), shuffleId=3, 
mapId=0, reduceId=34, message=
org.apache.spark.shuffle.FetchFailedException: 
/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index
 (Too many open files)
..
..
15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in stage 
11.0 (TID 306)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in 
stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage 11.0 
(TID 317, localhost): java.io.FileNotFoundException: 
/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc
 (Too many open files)

my fs is ext4 and currently ulist –n is 1024

Thanks
Saif



Re: Too many open files

2015-07-29 Thread Ted Yu
Please increase limit for open files:

http://stackoverflow.com/questions/34588/how-do-i-change-the-number-of-open-files-limit-in-linux



 On Jul 29, 2015, at 8:39 AM, saif.a.ell...@wellsfargo.com 
 saif.a.ell...@wellsfargo.com wrote:
 
 Hello,
  
 I’ve seen a couple emails on this issue but could not find anything to solve 
 my situation.
  
 Tried to reduce the partitioning level, enable consolidateFiles and increase 
 the sizeInFlight limit, but still no help. Spill manager is sort, which is 
 the default, any advice?
  
 15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID 331, 
 localhost): FetchFailed(BlockManagerId(driver, localhost, 43437), 
 shuffleId=3, mapId=0, reduceId=34, message=
 org.apache.spark.shuffle.FetchFailedException: 
 /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index
  (Too many open files)
 ..
 ..
 15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in stage 
 11.0 (TID 306)
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in 
 stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage 11.0 
 (TID 317, localhost): java.io.FileNotFoundException: 
 /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc
  (Too many open files)
  
 my fs is ext4 and currently ulist –n is 1024
  
 Thanks
 Saif
  


RE: Too many open files

2015-07-29 Thread Paul Röwer
Maybe you forgot Tod close a reader Ort writer object.

Am 29. Juli 2015 18:04:59 MESZ, schrieb saif.a.ell...@wellsfargo.com:
Thank you both, I will take a look, but


1.   For high-shuffle tasks, is this right for the system to have
the size and thresholds high? I hope there is no bad consequences.

2.   I will try to overlook admin access and see if I can get
anything with only user rights

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Wednesday, July 29, 2015 12:59 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.org
Subject: Re: Too many open files

Please increase limit for open files:

http://stackoverflow.com/questions/34588/how-do-i-change-the-number-of-open-files-limit-in-linux


On Jul 29, 2015, at 8:39 AM,
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com
wrote:
Hello,

I’ve seen a couple emails on this issue but could not find anything to
solve my situation.

Tried to reduce the partitioning level, enable consolidateFiles and
increase the sizeInFlight limit, but still no help. Spill manager is
sort, which is the default, any advice?

15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0
(TID 331, localhost): FetchFailed(BlockManagerId(driver, localhost,
43437), shuffleId=3, mapId=0, reduceId=34, message=
org.apache.spark.shuffle.FetchFailedException:
/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index
(Too many open files)
..
..
15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in
stage 11.0 (TID 306)
org.apache.spark.SparkException: Job aborted due to stage failure: Task
20 in stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in
stage 11.0 (TID 317, localhost): java.io.FileNotFoundException:
/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc
(Too many open files)

my fs is ext4 and currently ulist –n is 1024

Thanks
Saif

-- 
Diese Nachricht wurde von meinem Android-Mobiltelefon mit K-9 Mail gesendet.

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-30 Thread Bill Jay
The data ingestion is in outermost portion in foreachRDD block. Although
now I close the statement of jdbc, the same exception happened again. It
seems it is not related to the data ingestion part.

On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger c...@koeninger.org wrote:

 Use lsof to see what files are actually being held open.

 That stacktrace looks to me like it's from the driver, not executors.
 Where in foreach is it being called?  The outermost portion of foreachRDD
 runs in the driver, the innermost portion runs in the executors.  From the
 docs:

 https://spark.apache.org/docs/latest/streaming-programming-guide.html

 dstream.foreachRDD { rdd =
   val connection = createNewConnection()  // executed at the driver
   rdd.foreach { record =
 connection.send(record) // executed at the worker
   }}


 @td I've specifically looked at kafka socket connections for the standard
 1.3 code vs my branch that has cached connections.  The standard
 non-caching code has very short lived connections.  I've had jobs running
 for a month at a time, including ones writing to mysql.  Not saying it's
 impossible, but I'd think we need some evidence before speculating this has
 anything to do with it.


 On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 This function is called in foreachRDD. I think it should be running in
 the executors. I add the statement.close() in the code and it is running. I
 will let you know if this fixes the issue.



 On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the function ingestToMysql running on the driver or on the executors?
 Accordingly you can try debugging while running in a distributed manner,
 with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(),
 the problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or
 the sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
  wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka
 in the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark
 log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException:
 Too many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-30 Thread Cody Koeninger
Did you use lsof to see what files were opened during the job?

On Thu, Apr 30, 2015 at 1:05 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 The data ingestion is in outermost portion in foreachRDD block. Although
 now I close the statement of jdbc, the same exception happened again. It
 seems it is not related to the data ingestion part.

 On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Use lsof to see what files are actually being held open.

 That stacktrace looks to me like it's from the driver, not executors.
 Where in foreach is it being called?  The outermost portion of foreachRDD
 runs in the driver, the innermost portion runs in the executors.  From the
 docs:

 https://spark.apache.org/docs/latest/streaming-programming-guide.html

 dstream.foreachRDD { rdd =
   val connection = createNewConnection()  // executed at the driver
   rdd.foreach { record =
 connection.send(record) // executed at the worker
   }}


 @td I've specifically looked at kafka socket connections for the standard
 1.3 code vs my branch that has cached connections.  The standard
 non-caching code has very short lived connections.  I've had jobs running
 for a month at a time, including ones writing to mysql.  Not saying it's
 impossible, but I'd think we need some evidence before speculating this has
 anything to do with it.


 On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 This function is called in foreachRDD. I think it should be running in
 the executors. I add the statement.close() in the code and it is running. I
 will let you know if this fixes the issue.



 On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the function ingestToMysql running on the driver or on the
 executors? Accordingly you can try debugging while running in a distributed
 manner, with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(),
 the problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or
 the sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay 
 bill.jaypeter...@gmail.com wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka
 in the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark
 log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for
 time 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException:
 Too many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-30 Thread Bill Jay
I terminated the old job and now start a new one. Currently, the Spark
streaming job has been running for 2 hours and when I use lsof, I do not
see many files related to the Spark job.

BTW, I am running Spark streaming using local[2] mode. The batch is 5
seconds and it has around 50 lines to read each batch.

On Thu, Apr 30, 2015 at 11:15 AM, Cody Koeninger c...@koeninger.org wrote:

 Did you use lsof to see what files were opened during the job?

 On Thu, Apr 30, 2015 at 1:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 The data ingestion is in outermost portion in foreachRDD block. Although
 now I close the statement of jdbc, the same exception happened again. It
 seems it is not related to the data ingestion part.

 On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Use lsof to see what files are actually being held open.

 That stacktrace looks to me like it's from the driver, not executors.
 Where in foreach is it being called?  The outermost portion of foreachRDD
 runs in the driver, the innermost portion runs in the executors.  From the
 docs:

 https://spark.apache.org/docs/latest/streaming-programming-guide.html

 dstream.foreachRDD { rdd =
   val connection = createNewConnection()  // executed at the driver
   rdd.foreach { record =
 connection.send(record) // executed at the worker
   }}


 @td I've specifically looked at kafka socket connections for the
 standard 1.3 code vs my branch that has cached connections.  The standard
 non-caching code has very short lived connections.  I've had jobs running
 for a month at a time, including ones writing to mysql.  Not saying it's
 impossible, but I'd think we need some evidence before speculating this has
 anything to do with it.


 On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 This function is called in foreachRDD. I think it should be running in
 the executors. I add the statement.close() in the code and it is running. I
 will let you know if this fixes the issue.



 On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the function ingestToMysql running on the driver or on the
 executors? Accordingly you can try debugging while running in a 
 distributed
 manner, with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(),
 the problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then 
 its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
  wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or
 the sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay 
 bill.jaypeter...@gmail.com wrote:

 Hi all,

 I am using the direct approach to receive real-time data from
 Kafka in the following link:


 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark
 log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for
 time 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException:
 Too many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too 
 many
 open files, java.io.IOException: Too 

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Bill Jay
Thanks for the suggestion. I ran the command and the limit is 1024.

Based on my understanding, the connector to Kafka should not open so many
files. Do you think there is possible socket leakage? BTW, in every batch
which is 5 seconds, I output some results to mysql:

  def ingestToMysql(data: Array[String]) {
val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
var sql = insert into loggingserver1 values 
data.foreach(line = sql += line)
sql = sql.dropRight(1)
sql += ;
logger.info(sql)
var conn: java.sql.Connection = null
try {
  conn = DriverManager.getConnection(url)
  val statement = conn.createStatement()
  statement.executeUpdate(sql)
} catch {
  case e: Exception = logger.error(e.getMessage())
} finally {
  if (conn != null) {
conn.close
  }
}
  }

I am not sure whether the leakage originates from Kafka connector or the
sql connections.

Bill

On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Ted Yu
Can you run the command 'ulimit -n' to see the current limit ?

To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
Cheers

On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in the
 following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many
 open files, java.io.IOException: Too many open files, java.io.IOException:
 Too many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Ted Yu
Maybe add statement.close() in finally block ?

Streaming / Kafka experts may have better insight.

On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so many
 files. Do you think there is possible socket leakage? BTW, in every batch
 which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or the
 sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at 

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Tathagata Das
Is the function ingestToMysql running on the driver or on the executors?
Accordingly you can try debugging while running in a distributed manner,
with and without calling the function.

If you dont get too many open files without calling ingestToMysql(), the
problem is likely to be in ingestToMysql().
If you get the problem even without calling ingestToMysql(), then the
problem may be in Kafka. If the problem is occuring in the driver, then its
the DirecKafkaInputDStream code. If the problem is occurring in the
executor, then the problem is in KafkaRDD.

TD

On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so many
 files. Do you think there is possible socket leakage? BTW, in every batch
 which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or the
 sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Bill Jay
This function is called in foreachRDD. I think it should be running in the
executors. I add the statement.close() in the code and it is running. I
will let you know if this fixes the issue.



On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote:

 Is the function ingestToMysql running on the driver or on the executors?
 Accordingly you can try debugging while running in a distributed manner,
 with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(), the
 problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or the
 sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Tathagata Das
Also cc;ing Cody.

@Cody maybe there is a reason for doing connection pooling even if there is
not performance difference.

TD

On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote:

 Is the function ingestToMysql running on the driver or on the executors?
 Accordingly you can try debugging while running in a distributed manner,
 with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(), the
 problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or the
 sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Cody Koeninger
Use lsof to see what files are actually being held open.

That stacktrace looks to me like it's from the driver, not executors.
Where in foreach is it being called?  The outermost portion of foreachRDD
runs in the driver, the innermost portion runs in the executors.  From the
docs:

https://spark.apache.org/docs/latest/streaming-programming-guide.html

dstream.foreachRDD { rdd =
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =
connection.send(record) // executed at the worker
  }}


@td I've specifically looked at kafka socket connections for the standard
1.3 code vs my branch that has cached connections.  The standard
non-caching code has very short lived connections.  I've had jobs running
for a month at a time, including ones writing to mysql.  Not saying it's
impossible, but I'd think we need some evidence before speculating this has
anything to do with it.


On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 This function is called in foreachRDD. I think it should be running in the
 executors. I add the statement.close() in the code and it is running. I
 will let you know if this fixes the issue.



 On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the function ingestToMysql running on the driver or on the executors?
 Accordingly you can try debugging while running in a distributed manner,
 with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(),
 the problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or
 the sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka
 in the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark
 log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 

Re: Too many open files

2015-03-30 Thread Ted Yu
bq. In /etc/secucity/limits.conf set the next values:

Have you done the above modification on all the machines in your Spark
cluster ?

If you use Ubuntu, be sure that the /etc/pam.d/common-session file contains
the following line:

session required  pam_limits.so


On Mon, Mar 30, 2015 at 5:08 AM, Masf masfwo...@gmail.com wrote:

 Hi.

 I've done relogin, in fact, I put 'uname -n' and returns 100, but it
 crashs.
 I'm doing reduceByKey and SparkSQL mixed over 17 files (250MB-500MB/file)


 Regards.
 Miguel Angel.

 On Mon, Mar 30, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Mostly, you will have to restart the machines to get the ulimit effect
 (or relogin). What operation are you doing? Are you doing too many
 repartitions?

 Thanks
 Best Regards

 On Mon, Mar 30, 2015 at 4:52 PM, Masf masfwo...@gmail.com wrote:

 Hi

 I have a problem with temp data in Spark. I have fixed
 spark.shuffle.manager to SORT. In /etc/secucity/limits.conf set the next
 values:
 *   softnofile  100
 *   hardnofile  100
 In spark-env.sh set ulimit -n 100
 I've restarted the spark service and it continues crashing (Too many
 open files)

 How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2

 java.io.FileNotFoundException:
 /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
 (Too many open files)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:221)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
 at scala.Array$.fill(Array.scala:267)
 at
 org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID
 27, localhost): java.io.FileNotFoundException:
 /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
 (Too many open files)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:221)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
 at scala.Array$.fill(Array.scala:267)
 at
 org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)



 Thanks!
 --


 Regards.
 Miguel Ángel





 --


 Saludos.
 Miguel Ángel



Re: Too many open files

2015-03-30 Thread Masf
I'm executing my application in local mode (with --master local[*]).

I'm using ubuntu and I've put session required  pam_limits.so into
/etc/pam.d/common-session
but it doesn't work

On Mon, Mar 30, 2015 at 4:08 PM, Ted Yu yuzhih...@gmail.com wrote:

 bq. In /etc/secucity/limits.conf set the next values:

 Have you done the above modification on all the machines in your Spark
 cluster ?

 If you use Ubuntu, be sure that the /etc/pam.d/common-session file
 contains the following line:

 session required  pam_limits.so


 On Mon, Mar 30, 2015 at 5:08 AM, Masf masfwo...@gmail.com wrote:

 Hi.

 I've done relogin, in fact, I put 'uname -n' and returns 100, but it
 crashs.
 I'm doing reduceByKey and SparkSQL mixed over 17 files (250MB-500MB/file)


 Regards.
 Miguel Angel.

 On Mon, Mar 30, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Mostly, you will have to restart the machines to get the ulimit effect
 (or relogin). What operation are you doing? Are you doing too many
 repartitions?

 Thanks
 Best Regards

 On Mon, Mar 30, 2015 at 4:52 PM, Masf masfwo...@gmail.com wrote:

 Hi

 I have a problem with temp data in Spark. I have fixed
 spark.shuffle.manager to SORT. In /etc/secucity/limits.conf set the next
 values:
 *   softnofile  100
 *   hardnofile  100
 In spark-env.sh set ulimit -n 100
 I've restarted the spark service and it continues crashing (Too many
 open files)

 How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2

 java.io.FileNotFoundException:
 /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
 (Too many open files)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:221)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
 at scala.Array$.fill(Array.scala:267)
 at
 org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID
 27, localhost): java.io.FileNotFoundException:
 /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
 (Too many open files)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:221)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
 at scala.Array$.fill(Array.scala:267)
 at
 org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)



 Thanks!
 --


 Regards.
 Miguel Ángel





 --


 Saludos.
 Miguel Ángel





-- 


Saludos.
Miguel Ángel


Re: Too many open files

2015-03-30 Thread Akhil Das
Mostly, you will have to restart the machines to get the ulimit effect (or
relogin). What operation are you doing? Are you doing too many
repartitions?

Thanks
Best Regards

On Mon, Mar 30, 2015 at 4:52 PM, Masf masfwo...@gmail.com wrote:

 Hi

 I have a problem with temp data in Spark. I have fixed
 spark.shuffle.manager to SORT. In /etc/secucity/limits.conf set the next
 values:
 *   softnofile  100
 *   hardnofile  100
 In spark-env.sh set ulimit -n 100
 I've restarted the spark service and it continues crashing (Too many open
 files)

 How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2

 java.io.FileNotFoundException:
 /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
 (Too many open files)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:221)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
 at scala.Array$.fill(Array.scala:267)
 at
 org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID
 27, localhost): java.io.FileNotFoundException:
 /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
 (Too many open files)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:221)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
 at scala.Array$.fill(Array.scala:267)
 at
 org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)



 Thanks!
 --


 Regards.
 Miguel Ángel



Re: Too many open files

2015-03-30 Thread Masf
Hi.

I've done relogin, in fact, I put 'uname -n' and returns 100, but it
crashs.
I'm doing reduceByKey and SparkSQL mixed over 17 files (250MB-500MB/file)


Regards.
Miguel Angel.

On Mon, Mar 30, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Mostly, you will have to restart the machines to get the ulimit effect (or
 relogin). What operation are you doing? Are you doing too many
 repartitions?

 Thanks
 Best Regards

 On Mon, Mar 30, 2015 at 4:52 PM, Masf masfwo...@gmail.com wrote:

 Hi

 I have a problem with temp data in Spark. I have fixed
 spark.shuffle.manager to SORT. In /etc/secucity/limits.conf set the next
 values:
 *   softnofile  100
 *   hardnofile  100
 In spark-env.sh set ulimit -n 100
 I've restarted the spark service and it continues crashing (Too many open
 files)

 How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2

 java.io.FileNotFoundException:
 /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
 (Too many open files)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:221)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
 at scala.Array$.fill(Array.scala:267)
 at
 org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID
 27, localhost): java.io.FileNotFoundException:
 /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
 (Too many open files)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:221)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
 at scala.Array$.fill(Array.scala:267)
 at
 org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)



 Thanks!
 --


 Regards.
 Miguel Ángel





-- 


Saludos.
Miguel Ángel


Re: Too many open files

2014-08-29 Thread SK
Hi,

I am having the same problem reported by Michael. I am trying to open 30
files. ulimit -n  shows the limit is 1024. So I am not sure why the program
is failing with  Too many open files error. The total size of all the 30
files is 230 GB. 
I am running the job on a cluster with 10 nodes, each having 16 GB. The
error appears to be happening at the distinct() stage.

Here is my program. In the following code, are all the 10 nodes trying to
open all of the 30 files or are the files distributed among the 30 nodes?  

val baseFile = /mapr/mapr_dir/files_2013apr*
valx = sc.textFile(baseFile)).map { line =
val
fields = line.split(\t)

(fields(11), fields(6)) 
  
}.distinct().countByKey()
val xrdd = sc.parallelize(x.toSeq)
xrdd.saveAsTextFile(...) 

Instead of using the glob *, I guess I can try using a for loop to read the
files one by one if that helps, but not sure if there is a more efficient
solution. 

The following is the error transcript: 

Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent
failure: Exception failure in TID 902 on host 192.168.13.11:
java.io.FileNotFoundException:
/tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open
files) 
java.io.FileOutputStream.open(Native Method)
java.io.FileOutputStream.init(FileOutputStream.java:221)
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116)
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.Task.run(Task.scala:51)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744) Driver stacktrace:





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Too many open files

2014-08-29 Thread Ye Xianjin
Ops,the last reply didn't go to the user list.  Mail app's fault.

Shuffling happens in the cluster, so you need change all the nodes in the 
cluster.



Sent from my iPhone

 On 2014年8月30日, at 3:10, Sudha Krishna skrishna...@gmail.com wrote:
 
 Hi,
 
 Thanks for your response. Do you know if I need to change this limit on all 
 the cluster nodes or just the master?
 Thanks
 
 On Aug 29, 2014 11:43 AM, Ye Xianjin advance...@gmail.com wrote:
 1024 for the number of file limit is most likely too small for Linux 
 Machines on production. Try to set to 65536 or unlimited if you can. The too 
 many open files error occurs because there are a lot of shuffle files(if 
 wrong, please correct me):
 
 Sent from my iPhone
 
  On 2014年8月30日, at 2:06, SK skrishna...@gmail.com wrote:
 
  Hi,
 
  I am having the same problem reported by Michael. I am trying to open 30
  files. ulimit -n  shows the limit is 1024. So I am not sure why the program
  is failing with  Too many open files error. The total size of all the 30
  files is 230 GB.
  I am running the job on a cluster with 10 nodes, each having 16 GB. The
  error appears to be happening at the distinct() stage.
 
  Here is my program. In the following code, are all the 10 nodes trying to
  open all of the 30 files or are the files distributed among the 30 nodes?
 
 val baseFile = /mapr/mapr_dir/files_2013apr*
 valx = sc.textFile(baseFile)).map { line =
 val
  fields = line.split(\t)
 
  (fields(11), fields(6))
 
  }.distinct().countByKey()
 val xrdd = sc.parallelize(x.toSeq)
 xrdd.saveAsTextFile(...)
 
  Instead of using the glob *, I guess I can try using a for loop to read the
  files one by one if that helps, but not sure if there is a more efficient
  solution.
 
  The following is the error transcript:
 
  Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent
  failure: Exception failure in TID 902 on host 192.168.13.11:
  java.io.FileNotFoundException:
  /tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open
  files)
  java.io.FileOutputStream.open(Native Method)
  java.io.FileOutputStream.init(FileOutputStream.java:221)
  org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116)
  org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177)
  org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
  org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
  scala.collection.Iterator$class.foreach(Iterator.scala:727)
  org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159)
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  org.apache.spark.scheduler.Task.run(Task.scala:51)
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  java.lang.Thread.run(Thread.java:744) Driver stacktrace:
 
 
 
 
 
  --
  View this message in context: 
  http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 


Re: Too many open files exception on reduceByKey

2014-03-10 Thread Patrick Wendell
Hey Matt,

The best way is definitely just to increase the ulimit if possible,
this is sort of an assumption we make in Spark that clusters will be
able to move it around.

You might be able to hack around this by decreasing the number of
reducers but this could have some performance implications for your
job.

In general if a node in your cluster has C assigned cores and you run
a job with X reducers then Spark will open C*X files in parallel and
start writing. Shuffle consolidation will help decrease the total
number of files created but the number of file handles open at any
time doesn't change so it won't help the ulimit problem.

This means you'll have to use fewer reducers (e.g. pass reduceByKey a
number of reducers) or use fewer cores on each machine.

- Patrick

On Mon, Mar 10, 2014 at 10:41 AM, Matthew Cheah
matthew.c.ch...@gmail.com wrote:
 Hi everyone,

 My team (cc'ed in this e-mail) and I are running a Spark reduceByKey
 operation on a cluster of 10 slaves where I don't have the privileges to set
 ulimit -n to a higher number. I'm running on a cluster where ulimit -n
 returns 1024 on each machine.

 When I attempt to run this job with the data originating from a text file,
 stored in an HDFS cluster running on the same nodes as the Spark cluster,
 the job crashes with the message, Too many open files.

 My question is, why are so many files being created, and is there a way to
 configure the Spark context to avoid spawning that many files? I am already
 setting spark.shuffle.consolidateFiles to true.

 I want to repeat - I can't change the maximum number of open file
 descriptors on the machines. This cluster is not owned by me and the system
 administrator is responding quite slowly.

 Thanks,

 -Matt Cheah