: Failed to create file system watcher service: User limit of inotify instances reached or too many open files

2018-08-22 Thread Polisetti, Venkata Siva Rama Gopala Krishna
Hi,

When I am doing calculations for example 700 listID's it is saving only some 50 
rows and then getting some random exceptions

Getting below exception when I try to do calculations on huge data and try to 
save huge data . Please let me know if any suggestions.

Sample Code :

I have some lakhs ListID's making group of RDD  rdd.groupBy(row => 
row.getInt(6)).  And then using .map doing all ranking calculations

Step1 : groupedPartitionRdd =  rdd.groupBy(row => row.getInt(6))
Step2 :  val outputObjectForGainersAndLosers =  groupedPartitionRdd.map(grp =>
  {
//ranking logic and some 
calculations

  }
Step 3 :
outputObjectForGainersAndLosers.saveToCassandra("tablename",somecolumns ).


Am getting some random exceptions every time in spark- submit not able to debug 
and facing lot of issues.

Caused by: java.lang.RuntimeException: Failed to create file system watcher 
service: User limit of inotify instances reached or too many open files
Caused by: java.io.IOException: User limit of inotify instances reached or too 
many open files
Failed to create file system watcher service: User limit of inotify instances 
reached or too many open files

Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
at 
org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:154)
at 
org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:134)
at 
org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:644)
at 
org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:178)
at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
at org.apache.spark.network.server.TransportChannelHandler.channelRead

Thanks,
Gopi



The information contained in this message is intended only for the recipient, 
and may be a confidential attorney-client communication or may otherwise be 
privileged and confidential and protected from disclosure. If the reader of 
this message is not the intended recipient, or an employee or agent responsible 
for delivering this message to the intended recipient, please be aware that any 
dissemination or copying of this communication is strictly prohibited. If you 
have received this communication in error, please immediately notify us by 
replying to the message and deleting it from your computer. S&P Global Inc. 
reserves the right, subject to applicable local law, to monitor, review and 
process the content of any electronic message or information sent to or from 
S&P Global Inc. e-mail addresses without informing the sender or recipient of 
the message. By sending electronic message or information to S&P Global Inc. 
e-mail addresses you, as the sender, are consenting to S&P Global Inc. 
processing any of your personal data therein.


Re: [SparkSQL] too many open files although ulimit set to 1048576

2017-03-13 Thread darin
I think your sets not works
try add `ulimit -n 10240 ` in spark-env.sh




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-too-many-open-files-although-ulimit-set-to-1048576-tp28490p28491.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Too many open files, why changing ulimit not effecting?

2016-02-10 Thread Michael Diamant
If you are using systemd, you will need to specify the limit in the service
file.  I had run into this problem and discovered the solution from the
following references:
* https://bugzilla.redhat.com/show_bug.cgi?id=754285#c1
* http://serverfault.com/a/678861

On Fri, Feb 5, 2016 at 1:18 PM, Nirav Patel  wrote:

> For centos there's also /etc/security/limits.d/90-nproc.conf  that may
> need modifications.
>
> Services that you expect to use new limits needs to be restarted. Simple
> thing to do is to reboot the machine.
>
> On Fri, Feb 5, 2016 at 3:59 AM, Ted Yu  wrote:
>
>> bq. and *"session required pam_limits.so"*.
>>
>> What was the second file you modified ?
>>
>> Did you make the change on all the nodes ?
>>
>> Please see the verification step in
>> https://easyengine.io/tutorials/linux/increase-open-files-limit/
>>
>> On Fri, Feb 5, 2016 at 1:42 AM, Mohamed Nadjib MAMI > > wrote:
>>
>>> Hello all,
>>>
>>> I'm getting the famous *java.io.FileNotFoundException: ... (Too many
>>> open files) *exception. What seemed to have helped people out, it
>>> haven't for me. I tried to set the ulimit via the command line *"ulimit
>>> -n"*, then I tried to add the following lines to
>>> *"/etc/security/limits.conf"* file:
>>>
>>> ** - nofile 100*
>>> *root soft nofile 100*
>>> *root hard nofile 100*
>>> *hduser soft nofile 100*
>>> *hduser hard nofile 100*
>>>
>>> ...then I added this line *"session required pam_limits.so"* to the two
>>> files* "/etc/pam.d/common-session"* and *"session required
>>> pam_limits.so"*. The I logged-out/logged-in. First, I tried only the
>>> first line (** - nofile 100**)*, then added the 2nd and the 3rd
>>> (root...),  then added the last two lines (hduser...), no effect. Weirdly
>>> enough, when I check with the command *"ulimit -n"* it returns the
>>> correct value of 100.
>>>
>>> I then added *"ulimit -n 100"* to *"spark-env.sh"* in the master
>>> and in each of my workers, no effect.
>>>
>>> What else could it be besides changing the ulimit setting? if it's only
>>> that, what could cause Spark to ignore it?
>>>
>>> I'll appreciate any help in advance.
>>>
>>> --
>>> *PhD Student - EIS Group - Bonn University, Germany.*
>>> *+49 1575 8482232 <%2B49%201575%208482232>*
>>>
>>>
>>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>


Re: Too many open files, why changing ulimit not effecting?

2016-02-05 Thread Nirav Patel
For centos there's also /etc/security/limits.d/90-nproc.conf  that may need
modifications.

Services that you expect to use new limits needs to be restarted. Simple
thing to do is to reboot the machine.

On Fri, Feb 5, 2016 at 3:59 AM, Ted Yu  wrote:

> bq. and *"session required pam_limits.so"*.
>
> What was the second file you modified ?
>
> Did you make the change on all the nodes ?
>
> Please see the verification step in
> https://easyengine.io/tutorials/linux/increase-open-files-limit/
>
> On Fri, Feb 5, 2016 at 1:42 AM, Mohamed Nadjib MAMI 
> wrote:
>
>> Hello all,
>>
>> I'm getting the famous *java.io.FileNotFoundException: ... (Too many
>> open files) *exception. What seemed to have helped people out, it
>> haven't for me. I tried to set the ulimit via the command line *"ulimit
>> -n"*, then I tried to add the following lines to
>> *"/etc/security/limits.conf"* file:
>>
>> ** - nofile 100*
>> *root soft nofile 100*
>> *root hard nofile 100*
>> *hduser soft nofile 100*
>> *hduser hard nofile 100*
>>
>> ...then I added this line *"session required pam_limits.so"* to the two
>> files* "/etc/pam.d/common-session"* and *"session required
>> pam_limits.so"*. The I logged-out/logged-in. First, I tried only the
>> first line (** - nofile 100**)*, then added the 2nd and the 3rd
>> (root...),  then added the last two lines (hduser...), no effect. Weirdly
>> enough, when I check with the command *"ulimit -n"* it returns the
>> correct value of 100.
>>
>> I then added *"ulimit -n 100"* to *"spark-env.sh"* in the master and
>> in each of my workers, no effect.
>>
>> What else could it be besides changing the ulimit setting? if it's only
>> that, what could cause Spark to ignore it?
>>
>> I'll appreciate any help in advance.
>>
>> --
>> *PhD Student - EIS Group - Bonn University, Germany.*
>> *+49 1575 8482232 <%2B49%201575%208482232>*
>>
>>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: Too many open files, why changing ulimit not effecting?

2016-02-05 Thread Ted Yu
bq. and *"session required pam_limits.so"*.

What was the second file you modified ?

Did you make the change on all the nodes ?

Please see the verification step in
https://easyengine.io/tutorials/linux/increase-open-files-limit/

On Fri, Feb 5, 2016 at 1:42 AM, Mohamed Nadjib MAMI 
wrote:

> Hello all,
>
> I'm getting the famous *java.io.FileNotFoundException: ... (Too many open
> files) *exception. What seemed to have helped people out, it haven't for
> me. I tried to set the ulimit via the command line *"ulimit -n"*, then I
> tried to add the following lines to *"/etc/security/limits.conf"* file:
>
> ** - nofile 100*
> *root soft nofile 100*
> *root hard nofile 100*
> *hduser soft nofile 100*
> *hduser hard nofile 100*
>
> ...then I added this line *"session required pam_limits.so"* to the two
> files* "/etc/pam.d/common-session"* and *"session required pam_limits.so"*.
> The I logged-out/logged-in. First, I tried only the first line (** -
> nofile 100**)*, then added the 2nd and the 3rd (root...),  then added
> the last two lines (hduser...), no effect. Weirdly enough, when I check
> with the command *"ulimit -n"* it returns the correct value of 100.
>
> I then added *"ulimit -n 100"* to *"spark-env.sh"* in the master and
> in each of my workers, no effect.
>
> What else could it be besides changing the ulimit setting? if it's only
> that, what could cause Spark to ignore it?
>
> I'll appreciate any help in advance.
>
> --
> *PhD Student - EIS Group - Bonn University, Germany.*
> *+49 1575 8482232 <%2B49%201575%208482232>*
>
>


Too many open files, why changing ulimit not effecting?

2016-02-05 Thread Mohamed Nadjib MAMI

Hello all,

I'm getting the famous /java.io.FileNotFoundException: ... (Too many 
open files) /exception. What seemed to have helped people out, it 
haven't for me. I tried to set the ulimit via the command line /"ulimit 
-n"/, then I tried to add the following lines to 
/"/etc/security/limits.conf"/ file:

/
//* - nofile 100//
//root soft nofile 100//
//root hard nofile 100//
//hduser soft nofile 100//
//hduser hard nofile 100/

...then I added this line /"session required pam_limits.so"/ to the two 
files/"/etc/pam.d/common-session"/ and /"session required 
pam_limits.so"/. The I logged-out/logged-in. First, I tried only the 
first line (/* - nofile 100//)/, then added the 2nd and the 3rd 
(root...),  then added the last two lines (hduser...), no effect. 
Weirdly enough, when I check with the command /"ulimit -n"/ it returns 
the correct value of 100.


I then added /"ulimit -n 100"/ to /"spark-env.sh"/ in the master and 
in each of my workers, no effect.


What else could it be besides changing the ulimit setting? if it's only 
that, what could cause Spark to ignore it?


I'll appreciate any help in advance.

--
/PhD Student - EIS Group - Bonn University, Germany.//
//+49 1575 8482232/



Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2016-01-06 Thread Priya Ch
The line of code which I highlighted in the screenshot is within the spark
source code. Spark implements sort-based shuffle implementation and the
spilled files are merged using the merge sort.

Here is the link
https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf
which would convey the same.

On Wed, Jan 6, 2016 at 8:19 PM, Annabel Melongo 
wrote:

> Priya,
>
> It would be helpful if you put the entire trace log along with your code
> to help determine the root cause of the error.
>
> Thanks
>
>
> On Wednesday, January 6, 2016 4:00 AM, Priya Ch <
> learnings.chitt...@gmail.com> wrote:
>
>
> Running 'lsof' will let us know the open files but how do we come to know
> the root cause behind opening too many files.
>
> Thanks,
> Padma CH
>
> On Wed, Jan 6, 2016 at 8:39 AM, Hamel Kothari 
> wrote:
>
> The "Too Many Files" part of the exception is just indicative of the fact
> that when that call was made, too many files were already open. It doesn't
> necessarily mean that that line is the source of all of the open files,
> that's just the point at which it hit its limit.
>
> What I would recommend is to try to run this code again and use "lsof" on
> one of the spark executors (perhaps run it in a for loop, writing the
> output to separate files) until it fails and see which files are being
> opened, if there's anything that seems to be taking up a clear majority
> that might key you in on the culprit.
>
> On Tue, Jan 5, 2016 at 9:48 PM Priya Ch 
> wrote:
>
> Yes, the fileinputstream is closed. May be i didn't show in the screen
> shot .
>
> As spark implements, sort-based shuffle, there is a parameter called
> maximum merge factor which decides the number of files that can be merged
> at once and this avoids too many open files. I am suspecting that it is
> something related to this.
>
> Can someone confirm on this ?
>
> On Tue, Jan 5, 2016 at 11:19 PM, Annabel Melongo <
> melongo_anna...@yahoo.com> wrote:
>
> Vijay,
>
> Are you closing the fileinputstream at the end of each loop ( in.close())?
> My guess is those streams aren't close and thus the "too many open files"
> exception.
>
>
> On Tuesday, January 5, 2016 8:03 AM, Priya Ch <
> learnings.chitt...@gmail.com> wrote:
>
>
> Can some one throw light on this ?
>
> Regards,
> Padma Ch
>
> On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch 
> wrote:
>
> Chris, we are using spark 1.3.0 version. we have not set  
> spark.streaming.concurrentJobs
> this parameter. It takes the default value.
>
> Vijay,
>
>   From the tack trace it is evident that 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
> is throwing the exception. I opened the spark source code and visited the
> line which is throwing this exception i.e
>
> [image: Inline image 1]
>
> The lie which is marked in red is throwing the exception. The file is
> ExternalSorter.scala in org.apache.spark.util.collection package.
>
> i went through the following blog
> http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
> and understood that there is merge factor which decide the number of
> on-disk files that could be merged. Is it some way related to this ?
>
> Regards,
> Padma CH
>
> On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly  wrote:
>
> and which version of Spark/Spark Streaming are you using?
>
> are you explicitly setting the spark.streaming.concurrentJobs to
> something larger than the default of 1?
>
> if so, please try setting that back to 1 and see if the problem still
> exists.
>
> this is a dangerous parameter to modify from the default - which is why
> it's not well-documented.
>
>
> On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge 
> wrote:
>
> Few indicators -
>
> 1) during execution time - check total number of open files using lsof
> command. Need root permissions. If it is cluster not sure much !
> 2) which exact line in the code is triggering this error ? Can you paste
> that snippet ?
>
>
> On Wednesday 23 December 2015, Priya Ch 
> wrote:
>
> ulimit -n 65000
>
> fs.file-max = 65000 ( in etc/sysctl.conf file)
>
> Thanks,
> Padma Ch
>
> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma  wrote:
>
> Could you share the ulimit for your setup please ?
> - Thanks, via mobile,  excuse brevity.
> On Dec 22, 2015 6:39 PM, "Priya Ch"  wrote:
>
> Jakob,
>
>Increased the settings like fs.file-max in /etc/sysctl.conf and also
> increased user limi

Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2016-01-06 Thread Priya Ch
Running 'lsof' will let us know the open files but how do we come to know
the root cause behind opening too many files.

Thanks,
Padma CH

On Wed, Jan 6, 2016 at 8:39 AM, Hamel Kothari 
wrote:

> The "Too Many Files" part of the exception is just indicative of the fact
> that when that call was made, too many files were already open. It doesn't
> necessarily mean that that line is the source of all of the open files,
> that's just the point at which it hit its limit.
>
> What I would recommend is to try to run this code again and use "lsof" on
> one of the spark executors (perhaps run it in a for loop, writing the
> output to separate files) until it fails and see which files are being
> opened, if there's anything that seems to be taking up a clear majority
> that might key you in on the culprit.
>
> On Tue, Jan 5, 2016 at 9:48 PM Priya Ch 
> wrote:
>
>> Yes, the fileinputstream is closed. May be i didn't show in the screen
>> shot .
>>
>> As spark implements, sort-based shuffle, there is a parameter called
>> maximum merge factor which decides the number of files that can be merged
>> at once and this avoids too many open files. I am suspecting that it is
>> something related to this.
>>
>> Can someone confirm on this ?
>>
>> On Tue, Jan 5, 2016 at 11:19 PM, Annabel Melongo <
>> melongo_anna...@yahoo.com> wrote:
>>
>>> Vijay,
>>>
>>> Are you closing the fileinputstream at the end of each loop (
>>> in.close())? My guess is those streams aren't close and thus the "too many
>>> open files" exception.
>>>
>>>
>>> On Tuesday, January 5, 2016 8:03 AM, Priya Ch <
>>> learnings.chitt...@gmail.com> wrote:
>>>
>>>
>>> Can some one throw light on this ?
>>>
>>> Regards,
>>> Padma Ch
>>>
>>> On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch 
>>> wrote:
>>>
>>> Chris, we are using spark 1.3.0 version. we have not set  
>>> spark.streaming.concurrentJobs
>>> this parameter. It takes the default value.
>>>
>>> Vijay,
>>>
>>>   From the tack trace it is evident that 
>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
>>> is throwing the exception. I opened the spark source code and visited the
>>> line which is throwing this exception i.e
>>>
>>> [image: Inline image 1]
>>>
>>> The lie which is marked in red is throwing the exception. The file is
>>> ExternalSorter.scala in org.apache.spark.util.collection package.
>>>
>>> i went through the following blog
>>> http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
>>> and understood that there is merge factor which decide the number of
>>> on-disk files that could be merged. Is it some way related to this ?
>>>
>>> Regards,
>>> Padma CH
>>>
>>> On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly  wrote:
>>>
>>> and which version of Spark/Spark Streaming are you using?
>>>
>>> are you explicitly setting the spark.streaming.concurrentJobs to
>>> something larger than the default of 1?
>>>
>>> if so, please try setting that back to 1 and see if the problem still
>>> exists.
>>>
>>> this is a dangerous parameter to modify from the default - which is why
>>> it's not well-documented.
>>>
>>>
>>> On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge 
>>> wrote:
>>>
>>> Few indicators -
>>>
>>> 1) during execution time - check total number of open files using lsof
>>> command. Need root permissions. If it is cluster not sure much !
>>> 2) which exact line in the code is triggering this error ? Can you paste
>>> that snippet ?
>>>
>>>
>>> On Wednesday 23 December 2015, Priya Ch 
>>> wrote:
>>>
>>> ulimit -n 65000
>>>
>>> fs.file-max = 65000 ( in etc/sysctl.conf file)
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma  wrote:
>>>
>>> Could you share the ulimit for your setup please ?
>>> - Thanks, via mobile,  excuse brevity.
>>> On Dec 22, 2015 6:39 PM, "Priya Ch" 
>>> wrote:
>>>
>>> Jakob,
>>>
>>>Increased the settings like fs.file-max in /etc/sysctl.conf and also
>>> increased user limit in /etc/security/limits.conf. But still see the
>>> same issue.
>>>
>>> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky 
>>> wrote:
>>>
>>> It might be a good idea to see how many files are open and try
>>> increasing the open file limit (this is done on an os level). In some
>>> application use-cases it is actually a legitimate need.
>>>
>>> If that doesn't help, make sure you close any unused files and streams
>>> in your code. It will also be easier to help diagnose the issue if you send
>>> an error-reproducing snippet.
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Vijay Gharge
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> *Chris Fregly*
>>> Principal Data Solutions Engineer
>>> IBM Spark Technology Center, San Francisco, CA
>>> http://spark.tc | http://advancedspark.com
>>>
>>>
>>>
>>>
>>>
>>>
>>


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2016-01-05 Thread Priya Ch
Yes, the fileinputstream is closed. May be i didn't show in the screen shot
.

As spark implements, sort-based shuffle, there is a parameter called
maximum merge factor which decides the number of files that can be merged
at once and this avoids too many open files. I am suspecting that it is
something related to this.

Can someone confirm on this ?

On Tue, Jan 5, 2016 at 11:19 PM, Annabel Melongo 
wrote:

> Vijay,
>
> Are you closing the fileinputstream at the end of each loop ( in.close())?
> My guess is those streams aren't close and thus the "too many open files"
> exception.
>
>
> On Tuesday, January 5, 2016 8:03 AM, Priya Ch <
> learnings.chitt...@gmail.com> wrote:
>
>
> Can some one throw light on this ?
>
> Regards,
> Padma Ch
>
> On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch 
> wrote:
>
> Chris, we are using spark 1.3.0 version. we have not set  
> spark.streaming.concurrentJobs
> this parameter. It takes the default value.
>
> Vijay,
>
>   From the tack trace it is evident that 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
> is throwing the exception. I opened the spark source code and visited the
> line which is throwing this exception i.e
>
> [image: Inline image 1]
>
> The lie which is marked in red is throwing the exception. The file is
> ExternalSorter.scala in org.apache.spark.util.collection package.
>
> i went through the following blog
> http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
> and understood that there is merge factor which decide the number of
> on-disk files that could be merged. Is it some way related to this ?
>
> Regards,
> Padma CH
>
> On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly  wrote:
>
> and which version of Spark/Spark Streaming are you using?
>
> are you explicitly setting the spark.streaming.concurrentJobs to
> something larger than the default of 1?
>
> if so, please try setting that back to 1 and see if the problem still
> exists.
>
> this is a dangerous parameter to modify from the default - which is why
> it's not well-documented.
>
>
> On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge 
> wrote:
>
> Few indicators -
>
> 1) during execution time - check total number of open files using lsof
> command. Need root permissions. If it is cluster not sure much !
> 2) which exact line in the code is triggering this error ? Can you paste
> that snippet ?
>
>
> On Wednesday 23 December 2015, Priya Ch 
> wrote:
>
> ulimit -n 65000
>
> fs.file-max = 65000 ( in etc/sysctl.conf file)
>
> Thanks,
> Padma Ch
>
> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma  wrote:
>
> Could you share the ulimit for your setup please ?
> - Thanks, via mobile,  excuse brevity.
> On Dec 22, 2015 6:39 PM, "Priya Ch"  wrote:
>
> Jakob,
>
>Increased the settings like fs.file-max in /etc/sysctl.conf and also
> increased user limit in /etc/security/limits.conf. But still see the same
> issue.
>
> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky 
> wrote:
>
> It might be a good idea to see how many files are open and try increasing
> the open file limit (this is done on an os level). In some application
> use-cases it is actually a legitimate need.
>
> If that doesn't help, make sure you close any unused files and streams in
> your code. It will also be easier to help diagnose the issue if you send an
> error-reproducing snippet.
>
>
>
>
>
> --
> Regards,
> Vijay Gharge
>
>
>
>
>
>
> --
>
> *Chris Fregly*
> Principal Data Solutions Engineer
> IBM Spark Technology Center, San Francisco, CA
> http://spark.tc | http://advancedspark.com
>
>
>
>
>
>


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2016-01-05 Thread Annabel Melongo
Vijay,
Are you closing the fileinputstream at the end of each loop ( in.close())? My 
guess is those streams aren't close and thus the "too many open files" 
exception. 

On Tuesday, January 5, 2016 8:03 AM, Priya Ch 
 wrote:
 

 Can some one throw light on this ?
Regards,Padma Ch
On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch  wrote:

Chris, we are using spark 1.3.0 version. we have not set  
spark.streaming.concurrentJobs this parameter. It takes the default value.
Vijay,
  From the tack trace it is evident that 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
 is throwing the exception. I opened the spark source code and visited the line 
which is throwing this exception i.e  


The lie which is marked in red is throwing the exception. The file is 
ExternalSorter.scala in org.apache.spark.util.collection package.
i went through the following blog 
http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
 and understood that there is merge factor which decide the number of on-disk 
files that could be merged. Is it some way related to this ?
Regards,Padma CH
On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly  wrote:

and which version of Spark/Spark Streaming are you using?
are you explicitly setting the spark.streaming.concurrentJobs to something 
larger than the default of 1?  
if so, please try setting that back to 1 and see if the problem still exists.  
this is a dangerous parameter to modify from the default - which is why it's 
not well-documented.

On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge  wrote:

Few indicators -
1) during execution time - check total number of open files using lsof command. 
Need root permissions. If it is cluster not sure much !2) which exact line in 
the code is triggering this error ? Can you paste that snippet ?

On Wednesday 23 December 2015, Priya Ch  wrote:

ulimit -n 65000
fs.file-max = 65000 ( in etc/sysctl.conf file)
Thanks,Padma Ch
On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma  wrote:

Could you share the ulimit for your setup please ? - Thanks, via mobile,  
excuse brevity. On Dec 22, 2015 6:39 PM, "Priya Ch" 
 wrote:

Jakob,     Increased the settings like fs.file-max in /etc/sysctl.conf and also 
increased user limit in /etc/security/limits.conf. But still see the same issue.
On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky  wrote:

It might be a good idea to see how many files are open and try increasing the 
open file limit (this is done on an os level). In some application use-cases it 
is actually a legitimate need.

If that doesn't help, make sure you close any unused files and streams in your 
code. It will also be easier to help diagnose the issue if you send an 
error-reproducing snippet.








-- 
Regards,Vijay Gharge







-- 

Chris FreglyPrincipal Data Solutions EngineerIBM Spark Technology Center, San 
Francisco, CAhttp://spark.tc | http://advancedspark.com





  
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2016-01-05 Thread Priya Ch
Can some one throw light on this ?

Regards,
Padma Ch

On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch 
wrote:

> Chris, we are using spark 1.3.0 version. we have not set  
> spark.streaming.concurrentJobs
> this parameter. It takes the default value.
>
> Vijay,
>
>   From the tack trace it is evident that 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
> is throwing the exception. I opened the spark source code and visited the
> line which is throwing this exception i.e
>
> [image: Inline image 1]
>
> The lie which is marked in red is throwing the exception. The file is
> ExternalSorter.scala in org.apache.spark.util.collection package.
>
> i went through the following blog
> http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
> and understood that there is merge factor which decide the number of
> on-disk files that could be merged. Is it some way related to this ?
>
> Regards,
> Padma CH
>
> On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly  wrote:
>
>> and which version of Spark/Spark Streaming are you using?
>>
>> are you explicitly setting the spark.streaming.concurrentJobs to
>> something larger than the default of 1?
>>
>> if so, please try setting that back to 1 and see if the problem still
>> exists.
>>
>> this is a dangerous parameter to modify from the default - which is why
>> it's not well-documented.
>>
>>
>> On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge 
>> wrote:
>>
>>> Few indicators -
>>>
>>> 1) during execution time - check total number of open files using lsof
>>> command. Need root permissions. If it is cluster not sure much !
>>> 2) which exact line in the code is triggering this error ? Can you paste
>>> that snippet ?
>>>
>>>
>>> On Wednesday 23 December 2015, Priya Ch 
>>> wrote:
>>>
 ulimit -n 65000

 fs.file-max = 65000 ( in etc/sysctl.conf file)

 Thanks,
 Padma Ch

 On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma  wrote:

> Could you share the ulimit for your setup please ?
>
> - Thanks, via mobile,  excuse brevity.
> On Dec 22, 2015 6:39 PM, "Priya Ch" 
> wrote:
>
>> Jakob,
>>
>>Increased the settings like fs.file-max in /etc/sysctl.conf and
>> also increased user limit in /etc/security/limits.conf. But still
>> see the same issue.
>>
>> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky 
>> wrote:
>>
>>> It might be a good idea to see how many files are open and try
>>> increasing the open file limit (this is done on an os level). In some
>>> application use-cases it is actually a legitimate need.
>>>
>>> If that doesn't help, make sure you close any unused files and
>>> streams in your code. It will also be easier to help diagnose the issue 
>>> if
>>> you send an error-reproducing snippet.
>>>
>>
>>

>>>
>>> --
>>> Regards,
>>> Vijay Gharge
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> *Chris Fregly*
>> Principal Data Solutions Engineer
>> IBM Spark Technology Center, San Francisco, CA
>> http://spark.tc | http://advancedspark.com
>>
>
>


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-28 Thread Priya Ch
Chris, we are using spark 1.3.0 version. we have not set
spark.streaming.concurrentJobs
this parameter. It takes the default value.

Vijay,

  From the tack trace it is evident that
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
is throwing the exception. I opened the spark source code and visited the
line which is throwing this exception i.e

[image: Inline image 1]

The lie which is marked in red is throwing the exception. The file is
ExternalSorter.scala in org.apache.spark.util.collection package.

i went through the following blog
http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
and understood that there is merge factor which decide the number of
on-disk files that could be merged. Is it some way related to this ?

Regards,
Padma CH

On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly  wrote:

> and which version of Spark/Spark Streaming are you using?
>
> are you explicitly setting the spark.streaming.concurrentJobs to
> something larger than the default of 1?
>
> if so, please try setting that back to 1 and see if the problem still
> exists.
>
> this is a dangerous parameter to modify from the default - which is why
> it's not well-documented.
>
>
> On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge 
> wrote:
>
>> Few indicators -
>>
>> 1) during execution time - check total number of open files using lsof
>> command. Need root permissions. If it is cluster not sure much !
>> 2) which exact line in the code is triggering this error ? Can you paste
>> that snippet ?
>>
>>
>> On Wednesday 23 December 2015, Priya Ch 
>> wrote:
>>
>>> ulimit -n 65000
>>>
>>> fs.file-max = 65000 ( in etc/sysctl.conf file)
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma  wrote:
>>>
 Could you share the ulimit for your setup please ?

 - Thanks, via mobile,  excuse brevity.
 On Dec 22, 2015 6:39 PM, "Priya Ch" 
 wrote:

> Jakob,
>
>Increased the settings like fs.file-max in /etc/sysctl.conf and
> also increased user limit in /etc/security/limits.conf. But still see
> the same issue.
>
> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky 
> wrote:
>
>> It might be a good idea to see how many files are open and try
>> increasing the open file limit (this is done on an os level). In some
>> application use-cases it is actually a legitimate need.
>>
>> If that doesn't help, make sure you close any unused files and
>> streams in your code. It will also be easier to help diagnose the issue 
>> if
>> you send an error-reproducing snippet.
>>
>
>
>>>
>>
>> --
>> Regards,
>> Vijay Gharge
>>
>>
>>
>>
>
>
> --
>
> *Chris Fregly*
> Principal Data Solutions Engineer
> IBM Spark Technology Center, San Francisco, CA
> http://spark.tc | http://advancedspark.com
>


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-25 Thread Chris Fregly
and which version of Spark/Spark Streaming are you using?

are you explicitly setting the spark.streaming.concurrentJobs to something
larger than the default of 1?

if so, please try setting that back to 1 and see if the problem still
exists.

this is a dangerous parameter to modify from the default - which is why
it's not well-documented.


On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge 
wrote:

> Few indicators -
>
> 1) during execution time - check total number of open files using lsof
> command. Need root permissions. If it is cluster not sure much !
> 2) which exact line in the code is triggering this error ? Can you paste
> that snippet ?
>
>
> On Wednesday 23 December 2015, Priya Ch 
> wrote:
>
>> ulimit -n 65000
>>
>> fs.file-max = 65000 ( in etc/sysctl.conf file)
>>
>> Thanks,
>> Padma Ch
>>
>> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma  wrote:
>>
>>> Could you share the ulimit for your setup please ?
>>>
>>> - Thanks, via mobile,  excuse brevity.
>>> On Dec 22, 2015 6:39 PM, "Priya Ch" 
>>> wrote:
>>>
 Jakob,

Increased the settings like fs.file-max in /etc/sysctl.conf and
 also increased user limit in /etc/security/limits.conf. But still see
 the same issue.

 On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky 
 wrote:

> It might be a good idea to see how many files are open and try
> increasing the open file limit (this is done on an os level). In some
> application use-cases it is actually a legitimate need.
>
> If that doesn't help, make sure you close any unused files and streams
> in your code. It will also be easier to help diagnose the issue if you 
> send
> an error-reproducing snippet.
>


>>
>
> --
> Regards,
> Vijay Gharge
>
>
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-23 Thread Vijay Gharge
Few indicators -

1) during execution time - check total number of open files using lsof
command. Need root permissions. If it is cluster not sure much !
2) which exact line in the code is triggering this error ? Can you paste
that snippet ?

On Wednesday 23 December 2015, Priya Ch > wrote:

> ulimit -n 65000
>
> fs.file-max = 65000 ( in etc/sysctl.conf file)
>
> Thanks,
> Padma Ch
>
> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma  wrote:
>
>> Could you share the ulimit for your setup please ?
>>
>> - Thanks, via mobile,  excuse brevity.
>> On Dec 22, 2015 6:39 PM, "Priya Ch"  wrote:
>>
>>> Jakob,
>>>
>>>Increased the settings like fs.file-max in /etc/sysctl.conf and also
>>> increased user limit in /etc/security/limits.conf. But still see the
>>> same issue.
>>>
>>> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky 
>>> wrote:
>>>
 It might be a good idea to see how many files are open and try
 increasing the open file limit (this is done on an os level). In some
 application use-cases it is actually a legitimate need.

 If that doesn't help, make sure you close any unused files and streams
 in your code. It will also be easier to help diagnose the issue if you send
 an error-reproducing snippet.

>>>
>>>
>

-- 
Regards,
Vijay Gharge


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-23 Thread Priya Ch
ulimit -n 65000

fs.file-max = 65000 ( in etc/sysctl.conf file)

Thanks,
Padma Ch

On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma  wrote:

> Could you share the ulimit for your setup please ?
>
> - Thanks, via mobile,  excuse brevity.
> On Dec 22, 2015 6:39 PM, "Priya Ch"  wrote:
>
>> Jakob,
>>
>>Increased the settings like fs.file-max in /etc/sysctl.conf and also
>> increased user limit in /etc/security/limits.conf. But still see the
>> same issue.
>>
>> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky 
>> wrote:
>>
>>> It might be a good idea to see how many files are open and try
>>> increasing the open file limit (this is done on an os level). In some
>>> application use-cases it is actually a legitimate need.
>>>
>>> If that doesn't help, make sure you close any unused files and streams
>>> in your code. It will also be easier to help diagnose the issue if you send
>>> an error-reproducing snippet.
>>>
>>
>>


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-22 Thread Yash Sharma
Could you share the ulimit for your setup please ?

- Thanks, via mobile,  excuse brevity.
On Dec 22, 2015 6:39 PM, "Priya Ch"  wrote:

> Jakob,
>
>Increased the settings like fs.file-max in /etc/sysctl.conf and also
> increased user limit in /etc/security/limits.conf. But still see the same
> issue.
>
> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky 
> wrote:
>
>> It might be a good idea to see how many files are open and try increasing
>> the open file limit (this is done on an os level). In some application
>> use-cases it is actually a legitimate need.
>>
>> If that doesn't help, make sure you close any unused files and streams in
>> your code. It will also be easier to help diagnose the issue if you send an
>> error-reproducing snippet.
>>
>
>


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-22 Thread Priya Ch
Jakob,

   Increased the settings like fs.file-max in /etc/sysctl.conf and also
increased user limit in /etc/security/limits.conf. But still see the same
issue.

On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky  wrote:

> It might be a good idea to see how many files are open and try increasing
> the open file limit (this is done on an os level). In some application
> use-cases it is actually a legitimate need.
>
> If that doesn't help, make sure you close any unused files and streams in
> your code. It will also be easier to help diagnose the issue if you send an
> error-reproducing snippet.
>


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-17 Thread Jakob Odersky
It might be a good idea to see how many files are open and try increasing
the open file limit (this is done on an os level). In some application
use-cases it is actually a legitimate need.

If that doesn't help, make sure you close any unused files and streams in
your code. It will also be easier to help diagnose the issue if you send an
error-reproducing snippet.


java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-17 Thread Priya Ch
Hi All,


  When running streaming application, I am seeing the below error:


java.io.FileNotFoundException:
/data1/yarn/nm/usercache/root/appcache/application_1450172646510_0004/blockmgr-a81f42cd-6b52-4704-83f3-2cfc12a11b86/02/temp_shuffle_589ddccf-d436-4d2c-9935-e5f8c137b54b
(Too many open files)

at java.io.FileInputStream.open(Native Method)

at java.io.FileInputStream.(FileInputStream.java:146)

at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)

at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)

at
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:729)

at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

It looks like the issue is because in a multi-threaded application, there
are too many file handlers and this has reached maximum number of file
handles.

Regards,
Padma Ch


Re: "Too many open files" exception on reduceByKey

2015-10-11 Thread Tian Zhang
It turns out the mesos can overwrite the OS ulimit -n setting. So we have
increased the mesos slave ulimit -n setting.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p25019.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: "Too many open files" exception on reduceByKey

2015-10-09 Thread tian zhang
You are right, I did find that mesos overwrite this to a smaller number.So we 
will modify that and try to run again. Thanks!
Tian 


 On Thursday, October 8, 2015 4:18 PM, DB Tsai  wrote:
   

 Try to run to see actual ulimit. We found that mesos overrides the ulimit 
which causes the issue.
import sys.process._
val p = 1 to 100
val rdd = sc.parallelize(p, 100)
val a = rdd.map(x=> Seq("sh", "-c", "ulimit -n").!!.toDouble.toLong).collect



Sincerely,

DB Tsai
--Blog: 
https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D

On Thu, Oct 8, 2015 at 3:22 PM, Tian Zhang  wrote:

I hit this issue with spark 1.3.0 stateful application (with
updateStateByKey) function on mesos.  It will
fail after running fine for about 24 hours.
The error stack trace as below, I checked ulimit -n and we have very large
numbers set on the machines.
What else can be wrong?
15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
113727.0 (TID 833758, ip-10-112-10-221.ec2.internal):
java.io.FileNotFoundException:
/media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index
(Too many open files)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.(FileOutputStream.java:221)
        at java.io.FileOutputStream.(FileOutputStream.java:171)
        at
org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85)
        at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





  

Re: "Too many open files" exception on reduceByKey

2015-10-08 Thread DB Tsai
Try to run to see actual ulimit. We found that mesos overrides the ulimit
which causes the issue.

import sys.process._
val p = 1 to 100
val rdd = sc.parallelize(p, 100)
val a = rdd.map(x=> Seq("sh", "-c", "ulimit -n").!!.toDouble.toLong).collect




Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D
<https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D>

On Thu, Oct 8, 2015 at 3:22 PM, Tian Zhang  wrote:

> I hit this issue with spark 1.3.0 stateful application (with
> updateStateByKey) function on mesos.  It will
> fail after running fine for about 24 hours.
> The error stack trace as below, I checked ulimit -n and we have very large
> numbers set on the machines.
> What else can be wrong?
> 15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
> 113727.0 (TID 833758, ip-10-112-10-221.ec2.internal):
> java.io.FileNotFoundException:
>
> /media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index
> (Too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)
> at java.io.FileOutputStream.(FileOutputStream.java:171)
> at
>
> org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85)
> at
>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: "Too many open files" exception on reduceByKey

2015-10-08 Thread Tian Zhang
I hit this issue with spark 1.3.0 stateful application (with
updateStateByKey) function on mesos.  It will 
fail after running fine for about 24 hours.
The error stack trace as below, I checked ulimit -n and we have very large
numbers set on the machines.
What else can be wrong?
15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
113727.0 (TID 833758, ip-10-112-10-221.ec2.internal):
java.io.FileNotFoundException:
/media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index
(Too many open files)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at java.io.FileOutputStream.(FileOutputStream.java:171)
at
org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Too many open files issue

2015-09-03 Thread Sigurd Knippenberg
I don't think that is the issue. I have it setup to run in a thread pool
but I have set the pool size to 1 for this test until I get this resolved.
I am having some problems with using the Spark web portal since it is
picking a random port and with the way my environment is setup, by time I
have figured out which port it is using the job has finished. But what I
did do is add some logging and I added collecting the RDD record count to
make sure the last logging statements were in fact executed after the RDD
process ran. I added the logging statements in the job flow:

val directories = findModifiedFiles()
directories.foreach(directory => {
log 'Starting directory processor for $directory'
rdd = sparkContext.newAPIHadoopFile(directory)
.filter(...)
.map(...)
.reduceByKey(...)

rdd.foreachPartition(iterator => {
iterator.foreach(tuple => {
// send data to kafka
}
}

val count = rdd.count
log 'Processed $count records for $directory'
log 'Finished directory processor for $directory'
}

This results in these log lines until the "Too many open files in system"
errors started happening after which it only printed the first log line for
each iteration (as expected since it's throwing an exception).

Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/15
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/15
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/15
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/16
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/16
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/16
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/17
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/17
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/17
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/18
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/18
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/18
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/19
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/19
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/19
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/20

Just to see what would happen if I made the job run slower and had GC run
more frequently (in case it had something to do with that), I added the
following to each loop iteration:
  System.gc()
  Thread.sleep(5000)

But besides making the job run a lot longer it did not change anything.

Sigurd


On Wed, Sep 2, 2015 at 9:40 AM, Saisai Shao  wrote:

> Here is the code in which NewHadoopRDD register close handler and be
> called when the task is completed (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136
> ).
>
> From my understanding, possibly the reason is that this `foreach` code in
> your implementation is not executed Spark job one by one in loop as
> expected, on the contrary all the jobs are submitted to the DAGScheduler
> simultaneously, since each job has no dependency to others, Spark's
> scheduler will unwrap the loop and submit jobs in parallelism, so maybe
> several map stages are running and pending, this makes your node out of
> file handler.
>
> You could check Spark web portal to see if there's several map stages
> running simultaneously, or some of them are running while others are
> pending.
>
> Thanks
> Jerry
>
>
> On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg  > wrote:
>
>> Yep. I know. It's was set to 32K when I ran this test. If I bump it to
>> 64K the issue goes away. It still doesn't make sense to me that the Spark
>> job doesn't release its file handles until the end of the job instead of
>> doing that while my loop iterates.
>>
>> Sigurd
>>
>> On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran 
>> wrote:
>>
>>>
>>> On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
>>> wrote:
>>>
>>> I know I can adjust the max open files allowed by the OS but I'd rather
>>> fix the underlaying issue.
>>>
>>>
>>>
>>> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>>>
>>> https://wiki.apache.org/hadoop/TooManyOpenFiles
>>>
>>
>>
>


Re: Too many open files issue

2015-09-02 Thread Saisai Shao
Here is the code in which NewHadoopRDD register close handler and be called
when the task is completed (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136
).

>From my understanding, possibly the reason is that this `foreach` code in
your implementation is not executed Spark job one by one in loop as
expected, on the contrary all the jobs are submitted to the DAGScheduler
simultaneously, since each job has no dependency to others, Spark's
scheduler will unwrap the loop and submit jobs in parallelism, so maybe
several map stages are running and pending, this makes your node out of
file handler.

You could check Spark web portal to see if there's several map stages
running simultaneously, or some of them are running while others are
pending.

Thanks
Jerry


On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg 
wrote:

> Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K
> the issue goes away. It still doesn't make sense to me that the Spark job
> doesn't release its file handles until the end of the job instead of doing
> that while my loop iterates.
>
> Sigurd
>
> On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran 
> wrote:
>
>>
>> On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
>> wrote:
>>
>> I know I can adjust the max open files allowed by the OS but I'd rather
>> fix the underlaying issue.
>>
>>
>>
>> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>>
>> https://wiki.apache.org/hadoop/TooManyOpenFiles
>>
>
>


Re: Too many open files issue

2015-09-02 Thread Steve Loughran
ah, now that does sound suspicious...

On 2 Sep 2015, at 14:09, Sigurd Knippenberg 
mailto:sig...@knippenberg.com>> wrote:

Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K the 
issue goes away. It still doesn't make sense to me that the Spark job doesn't 
release its file handles until the end of the job instead of doing that while 
my loop iterates.

Sigurd

On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran 
mailto:ste...@hortonworks.com>> wrote:

On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
mailto:sig...@knippenberg.com>> wrote:

I know I can adjust the max open files allowed by the OS but I'd rather fix the 
underlaying issue.


bumping up the OS handle limits is step #1 of installing a hadoop cluster

https://wiki.apache.org/hadoop/TooManyOpenFiles




Re: Too many open files issue

2015-09-02 Thread Sigurd Knippenberg
Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K
the issue goes away. It still doesn't make sense to me that the Spark job
doesn't release its file handles until the end of the job instead of doing
that while my loop iterates.

Sigurd

On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran 
wrote:

>
> On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
> wrote:
>
> I know I can adjust the max open files allowed by the OS but I'd rather
> fix the underlaying issue.
>
>
>
> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>
> https://wiki.apache.org/hadoop/TooManyOpenFiles
>


Re: Too many open files issue

2015-09-02 Thread Steve Loughran

On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
mailto:sig...@knippenberg.com>> wrote:

I know I can adjust the max open files allowed by the OS but I'd rather fix the 
underlaying issue.


bumping up the OS handle limits is step #1 of installing a hadoop cluster

https://wiki.apache.org/hadoop/TooManyOpenFiles


Too many open files issue

2015-08-31 Thread Sigurd Knippenberg
I am running in a 'too many open files' issue and before I posted this I
have searched on the web to see if anyone had a solution already to my
particular problem but I did not see anything that helped.

I know I can adjust the max open files allowed by the OS but I'd rather fix
the underlaying issue.

In HDFS I have a directory that contains hourly files in a directory
structure like this: directoryname/hourly/2015/06/02/13 (where the numbers
at the end are the date in /MM/DD/HH format).

I have a Spark job that roughly does the following:

val directories = findModifiedFiles()
directories.foreach(directory => {
sparkContext.newAPIHadoopFile(directory)
.filter(...)
.map(...)
.reduceByKey(...)
.foreachPartition(iterator => {
iterator.foreach(tuple => {
// send data to kafka
}
}
}

If there are only a few directories that have been modified then this works
pretty well. But when I have the job reprocess all the data (I have 350M of
test data that pretty much has data for each hour of each day for a full
year) I run out of file handles.

I executed the test on a test cluster of 2 hadoop slave nodes that each
have the HDFS data node and yarn node manager running.

When I run "lsof -p" on the Spark processes, I see a lot of the following
types of open files:

java21196 yarn 3268r   REG   8,16   139  533320
/hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir17/blk_1073746324_5500.meta

java21196 yarn 3269r   REG   8,16 15004  533515
/hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir17/blk_1073746422

java21196 yarn 3270r   REG   8,16   127  533516
/hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir17/blk_1073746422_5598.meta

java21196 yarn 3271r   REG   8,16 15583  534081
/hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir19/blk_1073746704

java21196 yarn 3272r   REG   8,16   131  534082
/hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir19/blk_1073746704_5880.meta

When I watch the process run, I can see that the number of those open files
is ever increasing while it is processing directories until it runs out of
file handles (it then drops to zero and starts up again until it runs out
again but that is due to the fact that Yarn retries running the job). It
basically ends up opening about 4500 file handles to those files per node.

As I said, I know that I can increase the number of open file handles, and
I will do that, but in my opinion it should not be ever increasing. I would
have thought that when I was done with an RDD that Spark would close all
the resources that it opened for them (so that it would close the file
handles after each execution of the directories.foreach loop). I looked if
there was a close() method or something like that for the RDD but couldn't
find that.

Am I doing something that is causing Spark not to close the file handles?
Should I write this job differently?
Thanks,

Sigurd


RE: Too many open files

2015-07-29 Thread Paul Röwer
Maybe you forgot Tod close a reader Ort writer object.

Am 29. Juli 2015 18:04:59 MESZ, schrieb saif.a.ell...@wellsfargo.com:
>Thank you both, I will take a look, but
>
>
>1.   For high-shuffle tasks, is this right for the system to have
>the size and thresholds high? I hope there is no bad consequences.
>
>2.   I will try to overlook admin access and see if I can get
>anything with only user rights
>
>From: Ted Yu [mailto:yuzhih...@gmail.com]
>Sent: Wednesday, July 29, 2015 12:59 PM
>To: Ellafi, Saif A.
>Cc: 
>Subject: Re: Too many open files
>
>Please increase limit for open files:
>
>http://stackoverflow.com/questions/34588/how-do-i-change-the-number-of-open-files-limit-in-linux
>
>
>On Jul 29, 2015, at 8:39 AM,
>mailto:saif.a.ell...@wellsfargo.com>>
>mailto:saif.a.ell...@wellsfargo.com>>
>wrote:
>Hello,
>
>I’ve seen a couple emails on this issue but could not find anything to
>solve my situation.
>
>Tried to reduce the partitioning level, enable consolidateFiles and
>increase the sizeInFlight limit, but still no help. Spill manager is
>sort, which is the default, any advice?
>
>15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0
>(TID 331, localhost): FetchFailed(BlockManagerId(driver, localhost,
>43437), shuffleId=3, mapId=0, reduceId=34, message=
>org.apache.spark.shuffle.FetchFailedException:
>/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index
>(Too many open files)
>..
>..
>15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in
>stage 11.0 (TID 306)
>org.apache.spark.SparkException: Job aborted due to stage failure: Task
>20 in stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in
>stage 11.0 (TID 317, localhost): java.io.FileNotFoundException:
>/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc
>(Too many open files)
>
>my fs is ext4 and currently ulist –n is 1024
>
>Thanks
>Saif

-- 
Diese Nachricht wurde von meinem Android-Mobiltelefon mit K-9 Mail gesendet.

RE: Too many open files

2015-07-29 Thread Saif.A.Ellafi
Thank you both, I will take a look, but


1.   For high-shuffle tasks, is this right for the system to have the size 
and thresholds high? I hope there is no bad consequences.

2.   I will try to overlook admin access and see if I can get anything with 
only user rights

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Wednesday, July 29, 2015 12:59 PM
To: Ellafi, Saif A.
Cc: 
Subject: Re: Too many open files

Please increase limit for open files:

http://stackoverflow.com/questions/34588/how-do-i-change-the-number-of-open-files-limit-in-linux


On Jul 29, 2015, at 8:39 AM, 
mailto:saif.a.ell...@wellsfargo.com>> 
mailto:saif.a.ell...@wellsfargo.com>> wrote:
Hello,

I’ve seen a couple emails on this issue but could not find anything to solve my 
situation.

Tried to reduce the partitioning level, enable consolidateFiles and increase 
the sizeInFlight limit, but still no help. Spill manager is sort, which is the 
default, any advice?

15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID 331, 
localhost): FetchFailed(BlockManagerId(driver, localhost, 43437), shuffleId=3, 
mapId=0, reduceId=34, message=
org.apache.spark.shuffle.FetchFailedException: 
/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index
 (Too many open files)
..
..
15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in stage 
11.0 (TID 306)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in 
stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage 11.0 
(TID 317, localhost): java.io.FileNotFoundException: 
/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc
 (Too many open files)

my fs is ext4 and currently ulist –n is 1024

Thanks
Saif



Re: Too many open files

2015-07-29 Thread Ted Yu
Please increase limit for open files:

http://stackoverflow.com/questions/34588/how-do-i-change-the-number-of-open-files-limit-in-linux



> On Jul 29, 2015, at 8:39 AM,  
>  wrote:
> 
> Hello,
>  
> I’ve seen a couple emails on this issue but could not find anything to solve 
> my situation.
>  
> Tried to reduce the partitioning level, enable consolidateFiles and increase 
> the sizeInFlight limit, but still no help. Spill manager is sort, which is 
> the default, any advice?
>  
> 15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID 331, 
> localhost): FetchFailed(BlockManagerId(driver, localhost, 43437), 
> shuffleId=3, mapId=0, reduceId=34, message=
> org.apache.spark.shuffle.FetchFailedException: 
> /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index
>  (Too many open files)
> ..
> ..
> 15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in stage 
> 11.0 (TID 306)
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in 
> stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage 11.0 
> (TID 317, localhost): java.io.FileNotFoundException: 
> /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc
>  (Too many open files)
>  
> my fs is ext4 and currently ulist –n is 1024
>  
> Thanks
> Saif
>  


Re: Too many open files

2015-07-29 Thread Igor Berman
you probably should increase file handles limit for user that all processes
are running with(spark master & workers)
e.g.
http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/

On 29 July 2015 at 18:39,  wrote:

>  Hello,
>
> I’ve seen a couple emails on this issue but could not find anything to
> solve my situation.
>
> Tried to reduce the partitioning level, enable consolidateFiles and
> increase the sizeInFlight limit, but still no help. Spill manager is sort,
> which is the default, any advice?
>
> 15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID
> 331, localhost): FetchFailed(BlockManagerId(driver, localhost, 43437),
> shuffleId=3, mapId=0, reduceId=34, message=
> org.apache.spark.shuffle.FetchFailedException:
> /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index
> (Too many open files)
> ..
> ..
> 15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in
> stage 11.0 (TID 306)
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 20
> in stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage
> 11.0 (TID 317, localhost): java.io.FileNotFoundException:
> /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc
> (Too many open files)
>
> my fs is ext4 and currently ulist –n is 1024
>
> Thanks
> Saif
>
>


Too many open files

2015-07-29 Thread Saif.A.Ellafi
Hello,

I've seen a couple emails on this issue but could not find anything to solve my 
situation.

Tried to reduce the partitioning level, enable consolidateFiles and increase 
the sizeInFlight limit, but still no help. Spill manager is sort, which is the 
default, any advice?

15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID 331, 
localhost): FetchFailed(BlockManagerId(driver, localhost, 43437), shuffleId=3, 
mapId=0, reduceId=34, message=
org.apache.spark.shuffle.FetchFailedException: 
/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index
 (Too many open files)
..
..
15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in stage 
11.0 (TID 306)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in 
stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage 11.0 
(TID 317, localhost): java.io.FileNotFoundException: 
/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc
 (Too many open files)

my fs is ext4 and currently ulist -n is 1024

Thanks
Saif



Re: Too many open files when using Spark to consume messages from Kafka

2015-04-30 Thread Bill Jay
I terminated the old job and now start a new one. Currently, the Spark
streaming job has been running for 2 hours and when I use lsof, I do not
see many files related to the Spark job.

BTW, I am running Spark streaming using local[2] mode. The batch is 5
seconds and it has around 50 lines to read each batch.

On Thu, Apr 30, 2015 at 11:15 AM, Cody Koeninger  wrote:

> Did you use lsof to see what files were opened during the job?
>
> On Thu, Apr 30, 2015 at 1:05 PM, Bill Jay 
> wrote:
>
>> The data ingestion is in outermost portion in foreachRDD block. Although
>> now I close the statement of jdbc, the same exception happened again. It
>> seems it is not related to the data ingestion part.
>>
>> On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger 
>> wrote:
>>
>>> Use lsof to see what files are actually being held open.
>>>
>>> That stacktrace looks to me like it's from the driver, not executors.
>>> Where in foreach is it being called?  The outermost portion of foreachRDD
>>> runs in the driver, the innermost portion runs in the executors.  From the
>>> docs:
>>>
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html
>>>
>>> dstream.foreachRDD { rdd =>
>>>   val connection = createNewConnection()  // executed at the driver
>>>   rdd.foreach { record =>
>>> connection.send(record) // executed at the worker
>>>   }}
>>>
>>>
>>> @td I've specifically looked at kafka socket connections for the
>>> standard 1.3 code vs my branch that has cached connections.  The standard
>>> non-caching code has very short lived connections.  I've had jobs running
>>> for a month at a time, including ones writing to mysql.  Not saying it's
>>> impossible, but I'd think we need some evidence before speculating this has
>>> anything to do with it.
>>>
>>>
>>> On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay 
>>> wrote:
>>>
>>>> This function is called in foreachRDD. I think it should be running in
>>>> the executors. I add the statement.close() in the code and it is running. I
>>>> will let you know if this fixes the issue.
>>>>
>>>>
>>>>
>>>> On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das 
>>>> wrote:
>>>>
>>>>> Is the function ingestToMysql running on the driver or on the
>>>>> executors? Accordingly you can try debugging while running in a 
>>>>> distributed
>>>>> manner, with and without calling the function.
>>>>>
>>>>> If you dont get "too many open files" without calling ingestToMysql(),
>>>>> the problem is likely to be in ingestToMysql().
>>>>> If you get the problem even without calling ingestToMysql(), then the
>>>>> problem may be in Kafka. If the problem is occuring in the driver, then 
>>>>> its
>>>>> the DirecKafkaInputDStream code. If the problem is occurring in the
>>>>> executor, then the problem is in KafkaRDD.
>>>>>
>>>>> TD
>>>>>
>>>>> On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu  wrote:
>>>>>
>>>>>> Maybe add statement.close() in finally block ?
>>>>>>
>>>>>> Streaming / Kafka experts may have better insight.
>>>>>>
>>>>>> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay >>>>> > wrote:
>>>>>>
>>>>>>> Thanks for the suggestion. I ran the command and the limit is 1024.
>>>>>>>
>>>>>>> Based on my understanding, the connector to Kafka should not open so
>>>>>>> many files. Do you think there is possible socket leakage? BTW, in every
>>>>>>> batch which is 5 seconds, I output some results to mysql:
>>>>>>>
>>>>>>>   def ingestToMysql(data: Array[String]) {
>>>>>>> val url =
>>>>>>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123"
>>>>>>> var sql = "insert into loggingserver1 values "
>>>>>>> data.foreach(line => sql += line)
>>>>>>> sql = sql.dropRight(1)
>>>>>>> sql += ";"
>>>>>>> logger.info(sql)
>>>>>>> var conn: java.sql.Connection = null
>>>>>>> try {
>>>>>

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-30 Thread Cody Koeninger
Did you use lsof to see what files were opened during the job?

On Thu, Apr 30, 2015 at 1:05 PM, Bill Jay 
wrote:

> The data ingestion is in outermost portion in foreachRDD block. Although
> now I close the statement of jdbc, the same exception happened again. It
> seems it is not related to the data ingestion part.
>
> On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger 
> wrote:
>
>> Use lsof to see what files are actually being held open.
>>
>> That stacktrace looks to me like it's from the driver, not executors.
>> Where in foreach is it being called?  The outermost portion of foreachRDD
>> runs in the driver, the innermost portion runs in the executors.  From the
>> docs:
>>
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html
>>
>> dstream.foreachRDD { rdd =>
>>   val connection = createNewConnection()  // executed at the driver
>>   rdd.foreach { record =>
>> connection.send(record) // executed at the worker
>>   }}
>>
>>
>> @td I've specifically looked at kafka socket connections for the standard
>> 1.3 code vs my branch that has cached connections.  The standard
>> non-caching code has very short lived connections.  I've had jobs running
>> for a month at a time, including ones writing to mysql.  Not saying it's
>> impossible, but I'd think we need some evidence before speculating this has
>> anything to do with it.
>>
>>
>> On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay 
>> wrote:
>>
>>> This function is called in foreachRDD. I think it should be running in
>>> the executors. I add the statement.close() in the code and it is running. I
>>> will let you know if this fixes the issue.
>>>
>>>
>>>
>>> On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das 
>>> wrote:
>>>
>>>> Is the function ingestToMysql running on the driver or on the
>>>> executors? Accordingly you can try debugging while running in a distributed
>>>> manner, with and without calling the function.
>>>>
>>>> If you dont get "too many open files" without calling ingestToMysql(),
>>>> the problem is likely to be in ingestToMysql().
>>>> If you get the problem even without calling ingestToMysql(), then the
>>>> problem may be in Kafka. If the problem is occuring in the driver, then its
>>>> the DirecKafkaInputDStream code. If the problem is occurring in the
>>>> executor, then the problem is in KafkaRDD.
>>>>
>>>> TD
>>>>
>>>> On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu  wrote:
>>>>
>>>>> Maybe add statement.close() in finally block ?
>>>>>
>>>>> Streaming / Kafka experts may have better insight.
>>>>>
>>>>> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay 
>>>>> wrote:
>>>>>
>>>>>> Thanks for the suggestion. I ran the command and the limit is 1024.
>>>>>>
>>>>>> Based on my understanding, the connector to Kafka should not open so
>>>>>> many files. Do you think there is possible socket leakage? BTW, in every
>>>>>> batch which is 5 seconds, I output some results to mysql:
>>>>>>
>>>>>>   def ingestToMysql(data: Array[String]) {
>>>>>> val url =
>>>>>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123"
>>>>>> var sql = "insert into loggingserver1 values "
>>>>>> data.foreach(line => sql += line)
>>>>>> sql = sql.dropRight(1)
>>>>>> sql += ";"
>>>>>> logger.info(sql)
>>>>>> var conn: java.sql.Connection = null
>>>>>> try {
>>>>>>   conn = DriverManager.getConnection(url)
>>>>>>   val statement = conn.createStatement()
>>>>>>   statement.executeUpdate(sql)
>>>>>> } catch {
>>>>>>   case e: Exception => logger.error(e.getMessage())
>>>>>> } finally {
>>>>>>   if (conn != null) {
>>>>>> conn.close
>>>>>>   }
>>>>>> }
>>>>>>   }
>>>>>>
>>>>>> I am not sure whether the leakage originates from Kafka connector or
>>>>>> the sql connections.
>>>>>>
>>>>>> Bi

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-30 Thread Bill Jay
The data ingestion is in outermost portion in foreachRDD block. Although
now I close the statement of jdbc, the same exception happened again. It
seems it is not related to the data ingestion part.

On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger  wrote:

> Use lsof to see what files are actually being held open.
>
> That stacktrace looks to me like it's from the driver, not executors.
> Where in foreach is it being called?  The outermost portion of foreachRDD
> runs in the driver, the innermost portion runs in the executors.  From the
> docs:
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html
>
> dstream.foreachRDD { rdd =>
>   val connection = createNewConnection()  // executed at the driver
>   rdd.foreach { record =>
> connection.send(record) // executed at the worker
>   }}
>
>
> @td I've specifically looked at kafka socket connections for the standard
> 1.3 code vs my branch that has cached connections.  The standard
> non-caching code has very short lived connections.  I've had jobs running
> for a month at a time, including ones writing to mysql.  Not saying it's
> impossible, but I'd think we need some evidence before speculating this has
> anything to do with it.
>
>
> On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay 
> wrote:
>
>> This function is called in foreachRDD. I think it should be running in
>> the executors. I add the statement.close() in the code and it is running. I
>> will let you know if this fixes the issue.
>>
>>
>>
>> On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das 
>> wrote:
>>
>>> Is the function ingestToMysql running on the driver or on the executors?
>>> Accordingly you can try debugging while running in a distributed manner,
>>> with and without calling the function.
>>>
>>> If you dont get "too many open files" without calling ingestToMysql(),
>>> the problem is likely to be in ingestToMysql().
>>> If you get the problem even without calling ingestToMysql(), then the
>>> problem may be in Kafka. If the problem is occuring in the driver, then its
>>> the DirecKafkaInputDStream code. If the problem is occurring in the
>>> executor, then the problem is in KafkaRDD.
>>>
>>> TD
>>>
>>> On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu  wrote:
>>>
>>>> Maybe add statement.close() in finally block ?
>>>>
>>>> Streaming / Kafka experts may have better insight.
>>>>
>>>> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay 
>>>> wrote:
>>>>
>>>>> Thanks for the suggestion. I ran the command and the limit is 1024.
>>>>>
>>>>> Based on my understanding, the connector to Kafka should not open so
>>>>> many files. Do you think there is possible socket leakage? BTW, in every
>>>>> batch which is 5 seconds, I output some results to mysql:
>>>>>
>>>>>   def ingestToMysql(data: Array[String]) {
>>>>> val url =
>>>>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123"
>>>>> var sql = "insert into loggingserver1 values "
>>>>> data.foreach(line => sql += line)
>>>>> sql = sql.dropRight(1)
>>>>> sql += ";"
>>>>> logger.info(sql)
>>>>> var conn: java.sql.Connection = null
>>>>> try {
>>>>>   conn = DriverManager.getConnection(url)
>>>>>   val statement = conn.createStatement()
>>>>>   statement.executeUpdate(sql)
>>>>> } catch {
>>>>>   case e: Exception => logger.error(e.getMessage())
>>>>> } finally {
>>>>>   if (conn != null) {
>>>>> conn.close
>>>>>   }
>>>>> }
>>>>>   }
>>>>>
>>>>> I am not sure whether the leakage originates from Kafka connector or
>>>>> the sql connections.
>>>>>
>>>>> Bill
>>>>>
>>>>> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu  wrote:
>>>>>
>>>>>> Can you run the command 'ulimit -n' to see the current limit ?
>>>>>>
>>>>>> To configure ulimit settings on Ubuntu, edit
>>>>>> */etc/security/limits.conf*
>>>>>> Cheers
>>>>>>
>>>>>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay >>>>> > wrote:
>>>

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Cody Koeninger
Use lsof to see what files are actually being held open.

That stacktrace looks to me like it's from the driver, not executors.
Where in foreach is it being called?  The outermost portion of foreachRDD
runs in the driver, the innermost portion runs in the executors.  From the
docs:

https://spark.apache.org/docs/latest/streaming-programming-guide.html

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
connection.send(record) // executed at the worker
  }}


@td I've specifically looked at kafka socket connections for the standard
1.3 code vs my branch that has cached connections.  The standard
non-caching code has very short lived connections.  I've had jobs running
for a month at a time, including ones writing to mysql.  Not saying it's
impossible, but I'd think we need some evidence before speculating this has
anything to do with it.


On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay 
wrote:

> This function is called in foreachRDD. I think it should be running in the
> executors. I add the statement.close() in the code and it is running. I
> will let you know if this fixes the issue.
>
>
>
> On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das 
> wrote:
>
>> Is the function ingestToMysql running on the driver or on the executors?
>> Accordingly you can try debugging while running in a distributed manner,
>> with and without calling the function.
>>
>> If you dont get "too many open files" without calling ingestToMysql(),
>> the problem is likely to be in ingestToMysql().
>> If you get the problem even without calling ingestToMysql(), then the
>> problem may be in Kafka. If the problem is occuring in the driver, then its
>> the DirecKafkaInputDStream code. If the problem is occurring in the
>> executor, then the problem is in KafkaRDD.
>>
>> TD
>>
>> On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu  wrote:
>>
>>> Maybe add statement.close() in finally block ?
>>>
>>> Streaming / Kafka experts may have better insight.
>>>
>>> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay 
>>> wrote:
>>>
>>>> Thanks for the suggestion. I ran the command and the limit is 1024.
>>>>
>>>> Based on my understanding, the connector to Kafka should not open so
>>>> many files. Do you think there is possible socket leakage? BTW, in every
>>>> batch which is 5 seconds, I output some results to mysql:
>>>>
>>>>   def ingestToMysql(data: Array[String]) {
>>>> val url =
>>>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123"
>>>> var sql = "insert into loggingserver1 values "
>>>> data.foreach(line => sql += line)
>>>> sql = sql.dropRight(1)
>>>> sql += ";"
>>>> logger.info(sql)
>>>> var conn: java.sql.Connection = null
>>>> try {
>>>>   conn = DriverManager.getConnection(url)
>>>>   val statement = conn.createStatement()
>>>>   statement.executeUpdate(sql)
>>>> } catch {
>>>>   case e: Exception => logger.error(e.getMessage())
>>>> } finally {
>>>>   if (conn != null) {
>>>> conn.close
>>>>   }
>>>> }
>>>>   }
>>>>
>>>> I am not sure whether the leakage originates from Kafka connector or
>>>> the sql connections.
>>>>
>>>> Bill
>>>>
>>>> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu  wrote:
>>>>
>>>>> Can you run the command 'ulimit -n' to see the current limit ?
>>>>>
>>>>> To configure ulimit settings on Ubuntu, edit
>>>>> */etc/security/limits.conf*
>>>>> Cheers
>>>>>
>>>>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay 
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I am using the direct approach to receive real-time data from Kafka
>>>>>> in the following link:
>>>>>>
>>>>>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>>>>>>
>>>>>>
>>>>>> My code follows the word count direct example:
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordC

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Bill Jay
This function is called in foreachRDD. I think it should be running in the
executors. I add the statement.close() in the code and it is running. I
will let you know if this fixes the issue.



On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das  wrote:

> Is the function ingestToMysql running on the driver or on the executors?
> Accordingly you can try debugging while running in a distributed manner,
> with and without calling the function.
>
> If you dont get "too many open files" without calling ingestToMysql(), the
> problem is likely to be in ingestToMysql().
> If you get the problem even without calling ingestToMysql(), then the
> problem may be in Kafka. If the problem is occuring in the driver, then its
> the DirecKafkaInputDStream code. If the problem is occurring in the
> executor, then the problem is in KafkaRDD.
>
> TD
>
> On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu  wrote:
>
>> Maybe add statement.close() in finally block ?
>>
>> Streaming / Kafka experts may have better insight.
>>
>> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay 
>> wrote:
>>
>>> Thanks for the suggestion. I ran the command and the limit is 1024.
>>>
>>> Based on my understanding, the connector to Kafka should not open so
>>> many files. Do you think there is possible socket leakage? BTW, in every
>>> batch which is 5 seconds, I output some results to mysql:
>>>
>>>   def ingestToMysql(data: Array[String]) {
>>> val url =
>>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123"
>>> var sql = "insert into loggingserver1 values "
>>> data.foreach(line => sql += line)
>>> sql = sql.dropRight(1)
>>> sql += ";"
>>> logger.info(sql)
>>> var conn: java.sql.Connection = null
>>> try {
>>>   conn = DriverManager.getConnection(url)
>>>   val statement = conn.createStatement()
>>>   statement.executeUpdate(sql)
>>> } catch {
>>>   case e: Exception => logger.error(e.getMessage())
>>> } finally {
>>>   if (conn != null) {
>>> conn.close
>>>   }
>>> }
>>>   }
>>>
>>> I am not sure whether the leakage originates from Kafka connector or the
>>> sql connections.
>>>
>>> Bill
>>>
>>> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu  wrote:
>>>
>>>> Can you run the command 'ulimit -n' to see the current limit ?
>>>>
>>>> To configure ulimit settings on Ubuntu, edit
>>>> */etc/security/limits.conf*
>>>> Cheers
>>>>
>>>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I am using the direct approach to receive real-time data from Kafka in
>>>>> the following link:
>>>>>
>>>>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>>>>>
>>>>>
>>>>> My code follows the word count direct example:
>>>>>
>>>>>
>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
>>>>>
>>>>>
>>>>>
>>>>> After around 12 hours, I got the following error messages in Spark log:
>>>>>
>>>>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
>>>>> 143033869 ms
>>>>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
>>>>> many open files, java.io.IOException: Too many open files,
>>>>> java.io.IOException: Too many open files, java.io.IOException: Too many
>>>>> open files, java.io.IOException: Too many open files)
>>>>> at
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>>> at
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>>> at
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>> at
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>> at
>>>>> scala.util.DynamicVariable.withValue(DynamicVar

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Tathagata Das
Also cc;ing Cody.

@Cody maybe there is a reason for doing connection pooling even if there is
not performance difference.

TD

On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das  wrote:

> Is the function ingestToMysql running on the driver or on the executors?
> Accordingly you can try debugging while running in a distributed manner,
> with and without calling the function.
>
> If you dont get "too many open files" without calling ingestToMysql(), the
> problem is likely to be in ingestToMysql().
> If you get the problem even without calling ingestToMysql(), then the
> problem may be in Kafka. If the problem is occuring in the driver, then its
> the DirecKafkaInputDStream code. If the problem is occurring in the
> executor, then the problem is in KafkaRDD.
>
> TD
>
> On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu  wrote:
>
>> Maybe add statement.close() in finally block ?
>>
>> Streaming / Kafka experts may have better insight.
>>
>> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay 
>> wrote:
>>
>>> Thanks for the suggestion. I ran the command and the limit is 1024.
>>>
>>> Based on my understanding, the connector to Kafka should not open so
>>> many files. Do you think there is possible socket leakage? BTW, in every
>>> batch which is 5 seconds, I output some results to mysql:
>>>
>>>   def ingestToMysql(data: Array[String]) {
>>> val url =
>>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123"
>>> var sql = "insert into loggingserver1 values "
>>> data.foreach(line => sql += line)
>>> sql = sql.dropRight(1)
>>> sql += ";"
>>> logger.info(sql)
>>> var conn: java.sql.Connection = null
>>> try {
>>>   conn = DriverManager.getConnection(url)
>>>   val statement = conn.createStatement()
>>>   statement.executeUpdate(sql)
>>> } catch {
>>>   case e: Exception => logger.error(e.getMessage())
>>> } finally {
>>>   if (conn != null) {
>>> conn.close
>>>   }
>>> }
>>>   }
>>>
>>> I am not sure whether the leakage originates from Kafka connector or the
>>> sql connections.
>>>
>>> Bill
>>>
>>> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu  wrote:
>>>
>>>> Can you run the command 'ulimit -n' to see the current limit ?
>>>>
>>>> To configure ulimit settings on Ubuntu, edit
>>>> */etc/security/limits.conf*
>>>> Cheers
>>>>
>>>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I am using the direct approach to receive real-time data from Kafka in
>>>>> the following link:
>>>>>
>>>>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>>>>>
>>>>>
>>>>> My code follows the word count direct example:
>>>>>
>>>>>
>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
>>>>>
>>>>>
>>>>>
>>>>> After around 12 hours, I got the following error messages in Spark log:
>>>>>
>>>>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
>>>>> 143033869 ms
>>>>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
>>>>> many open files, java.io.IOException: Too many open files,
>>>>> java.io.IOException: Too many open files, java.io.IOException: Too many
>>>>> open files, java.io.IOException: Too many open files)
>>>>> at
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>>> at
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>>> at
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>> at
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>> at
>>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>> at
>>>

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Tathagata Das
Is the function ingestToMysql running on the driver or on the executors?
Accordingly you can try debugging while running in a distributed manner,
with and without calling the function.

If you dont get "too many open files" without calling ingestToMysql(), the
problem is likely to be in ingestToMysql().
If you get the problem even without calling ingestToMysql(), then the
problem may be in Kafka. If the problem is occuring in the driver, then its
the DirecKafkaInputDStream code. If the problem is occurring in the
executor, then the problem is in KafkaRDD.

TD

On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu  wrote:

> Maybe add statement.close() in finally block ?
>
> Streaming / Kafka experts may have better insight.
>
> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay 
> wrote:
>
>> Thanks for the suggestion. I ran the command and the limit is 1024.
>>
>> Based on my understanding, the connector to Kafka should not open so many
>> files. Do you think there is possible socket leakage? BTW, in every batch
>> which is 5 seconds, I output some results to mysql:
>>
>>   def ingestToMysql(data: Array[String]) {
>> val url =
>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123"
>> var sql = "insert into loggingserver1 values "
>> data.foreach(line => sql += line)
>> sql = sql.dropRight(1)
>> sql += ";"
>> logger.info(sql)
>> var conn: java.sql.Connection = null
>> try {
>>   conn = DriverManager.getConnection(url)
>>   val statement = conn.createStatement()
>>   statement.executeUpdate(sql)
>> } catch {
>>   case e: Exception => logger.error(e.getMessage())
>> } finally {
>>   if (conn != null) {
>> conn.close
>>   }
>> }
>>   }
>>
>> I am not sure whether the leakage originates from Kafka connector or the
>> sql connections.
>>
>> Bill
>>
>> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu  wrote:
>>
>>> Can you run the command 'ulimit -n' to see the current limit ?
>>>
>>> To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
>>> Cheers
>>>
>>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am using the direct approach to receive real-time data from Kafka in
>>>> the following link:
>>>>
>>>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>>>>
>>>>
>>>> My code follows the word count direct example:
>>>>
>>>>
>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
>>>>
>>>>
>>>>
>>>> After around 12 hours, I got the following error messages in Spark log:
>>>>
>>>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
>>>> 143033869 ms
>>>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
>>>> many open files, java.io.IOException: Too many open files,
>>>> java.io.IOException: Too many open files, java.io.IOException: Too many
>>>> open files, java.io.IOException: Too many open files)
>>>> at
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>> at
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>> at
>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>>> at scala.Option.orElse(Option.scala:257)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>>> at
>>>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>>>> at
>>>> org.apache.spark.str

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Ted Yu
Maybe add statement.close() in finally block ?

Streaming / Kafka experts may have better insight.

On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay 
wrote:

> Thanks for the suggestion. I ran the command and the limit is 1024.
>
> Based on my understanding, the connector to Kafka should not open so many
> files. Do you think there is possible socket leakage? BTW, in every batch
> which is 5 seconds, I output some results to mysql:
>
>   def ingestToMysql(data: Array[String]) {
> val url = "jdbc:mysql://localhost:3306/realtime?user=root&password=123"
> var sql = "insert into loggingserver1 values "
> data.foreach(line => sql += line)
> sql = sql.dropRight(1)
> sql += ";"
> logger.info(sql)
> var conn: java.sql.Connection = null
> try {
>   conn = DriverManager.getConnection(url)
>   val statement = conn.createStatement()
>   statement.executeUpdate(sql)
> } catch {
>   case e: Exception => logger.error(e.getMessage())
> } finally {
>   if (conn != null) {
> conn.close
>   }
> }
>   }
>
> I am not sure whether the leakage originates from Kafka connector or the
> sql connections.
>
> Bill
>
> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu  wrote:
>
>> Can you run the command 'ulimit -n' to see the current limit ?
>>
>> To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
>> Cheers
>>
>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay 
>> wrote:
>>
>>> Hi all,
>>>
>>> I am using the direct approach to receive real-time data from Kafka in
>>> the following link:
>>>
>>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>>>
>>>
>>> My code follows the word count direct example:
>>>
>>>
>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
>>>
>>>
>>>
>>> After around 12 hours, I got the following error messages in Spark log:
>>>
>>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
>>> 143033869 ms
>>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
>>> many open files, java.io.IOException: Too many open files,
>>> java.io.IOException: Too many open files, java.io.IOException: Too many
>>> open files, java.io.IOException: Too many open files)
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>> at scala.Option.orElse(Option.scala:257)
>>> at
>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>> at
>>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>> at scala.Option.orElse(Option.scala:257)
>>> at
>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>> at
>>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>> at
&g

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Bill Jay
Thanks for the suggestion. I ran the command and the limit is 1024.

Based on my understanding, the connector to Kafka should not open so many
files. Do you think there is possible socket leakage? BTW, in every batch
which is 5 seconds, I output some results to mysql:

  def ingestToMysql(data: Array[String]) {
val url = "jdbc:mysql://localhost:3306/realtime?user=root&password=123"
var sql = "insert into loggingserver1 values "
data.foreach(line => sql += line)
sql = sql.dropRight(1)
sql += ";"
logger.info(sql)
var conn: java.sql.Connection = null
try {
  conn = DriverManager.getConnection(url)
  val statement = conn.createStatement()
  statement.executeUpdate(sql)
} catch {
  case e: Exception => logger.error(e.getMessage())
} finally {
  if (conn != null) {
conn.close
  }
}
  }

I am not sure whether the leakage originates from Kafka connector or the
sql connections.

Bill

On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu  wrote:

> Can you run the command 'ulimit -n' to see the current limit ?
>
> To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
> Cheers
>
> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay 
> wrote:
>
>> Hi all,
>>
>> I am using the direct approach to receive real-time data from Kafka in
>> the following link:
>>
>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>>
>>
>> My code follows the word count direct example:
>>
>>
>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
>>
>>
>>
>> After around 12 hours, I got the following error messages in Spark log:
>>
>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
>> 143033869 ms
>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
>> many open files, java.io.IOException: Too many open files,
>> java.io.IOException: Too many open files, java.io.IOException: Too many
>> open files, java.io.IOException: Too many open files)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>> at
>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>> at
>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Ted Yu
Can you run the command 'ulimit -n' to see the current limit ?

To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
Cheers

On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay 
wrote:

> Hi all,
>
> I am using the direct approach to receive real-time data from Kafka in the
> following link:
>
> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>
>
> My code follows the word count direct example:
>
>
> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
>
>
>
> After around 12 hours, I got the following error messages in Spark log:
>
> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
> 143033869 ms
> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many
> open files, java.io.IOException: Too many open files, java.io.IOException:
> Too many open files, java.io.IOException: Too many open files,
> java.io.IOException: Too many open files)
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply

Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Bill Jay
Hi all,

I am using the direct approach to receive real-time data from Kafka in the
following link:

https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


My code follows the word count direct example:

https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



After around 12 hours, I got the following error messages in Spark log:

15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
143033869 ms
org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many
open files, java.io.IOException: Too many open files, java.io.IOException:
Too many open files, java.io.IOException: Too many open files,
java.io.IOException: Too many open files)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at

Re: Too many open files

2015-03-30 Thread Masf
I'm executing my application in local mode (with --master local[*]).

I'm using ubuntu and I've put "session required  pam_limits.so" into
/etc/pam.d/common-session
but it doesn't work

On Mon, Mar 30, 2015 at 4:08 PM, Ted Yu  wrote:

> bq. In /etc/secucity/limits.conf set the next values:
>
> Have you done the above modification on all the machines in your Spark
> cluster ?
>
> If you use Ubuntu, be sure that the /etc/pam.d/common-session file
> contains the following line:
>
> session required  pam_limits.so
>
>
> On Mon, Mar 30, 2015 at 5:08 AM, Masf  wrote:
>
>> Hi.
>>
>> I've done relogin, in fact, I put 'uname -n' and returns 100, but it
>> crashs.
>> I'm doing reduceByKey and SparkSQL mixed over 17 files (250MB-500MB/file)
>>
>>
>> Regards.
>> Miguel Angel.
>>
>> On Mon, Mar 30, 2015 at 1:52 PM, Akhil Das 
>> wrote:
>>
>>> Mostly, you will have to restart the machines to get the ulimit effect
>>> (or relogin). What operation are you doing? Are you doing too many
>>> repartitions?
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Mar 30, 2015 at 4:52 PM, Masf  wrote:
>>>
>>>> Hi
>>>>
>>>> I have a problem with temp data in Spark. I have fixed
>>>> spark.shuffle.manager to "SORT". In /etc/secucity/limits.conf set the next
>>>> values:
>>>> *   softnofile  100
>>>> *   hardnofile  100
>>>> In spark-env.sh set ulimit -n 100
>>>> I've restarted the spark service and it continues crashing (Too many
>>>> open files)
>>>>
>>>> How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2
>>>>
>>>> java.io.FileNotFoundException:
>>>> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
>>>> (Too many open files)
>>>> at java.io.FileOutputStream.open(Native Method)
>>>> at java.io.FileOutputStream.(FileOutputStream.java:221)
>>>> at
>>>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
>>>> at
>>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
>>>> at
>>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
>>>> at scala.Array$.fill(Array.scala:267)
>>>> at
>>>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
>>>> at
>>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
>>>> at
>>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
>>>> at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>> at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>> at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID
>>>> 27, localhost): java.io.FileNotFoundException:
>>>> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
>>>> (Too many open files)
>>>> at java.io.FileOutputStream.open(Native Method)
>>>> at java.io.FileOutputStream.(FileOutputStream.java:221)
>>>> at
>>>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
>>>> at
>>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
>>>> at
>>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
>>>> at scala.Array$.fill(Array.scala:267)
>>>> 

Re: Too many open files

2015-03-30 Thread Ted Yu
bq. In /etc/secucity/limits.conf set the next values:

Have you done the above modification on all the machines in your Spark
cluster ?

If you use Ubuntu, be sure that the /etc/pam.d/common-session file contains
the following line:

session required  pam_limits.so


On Mon, Mar 30, 2015 at 5:08 AM, Masf  wrote:

> Hi.
>
> I've done relogin, in fact, I put 'uname -n' and returns 100, but it
> crashs.
> I'm doing reduceByKey and SparkSQL mixed over 17 files (250MB-500MB/file)
>
>
> Regards.
> Miguel Angel.
>
> On Mon, Mar 30, 2015 at 1:52 PM, Akhil Das 
> wrote:
>
>> Mostly, you will have to restart the machines to get the ulimit effect
>> (or relogin). What operation are you doing? Are you doing too many
>> repartitions?
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Mar 30, 2015 at 4:52 PM, Masf  wrote:
>>
>>> Hi
>>>
>>> I have a problem with temp data in Spark. I have fixed
>>> spark.shuffle.manager to "SORT". In /etc/secucity/limits.conf set the next
>>> values:
>>> *   soft    nofile  1000000
>>> *   hardnofile  100
>>> In spark-env.sh set ulimit -n 100
>>> I've restarted the spark service and it continues crashing (Too many
>>> open files)
>>>
>>> How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2
>>>
>>> java.io.FileNotFoundException:
>>> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
>>> (Too many open files)
>>> at java.io.FileOutputStream.open(Native Method)
>>> at java.io.FileOutputStream.(FileOutputStream.java:221)
>>> at
>>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
>>> at
>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
>>> at
>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
>>> at scala.Array$.fill(Array.scala:267)
>>> at
>>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
>>> at
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
>>> at
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>> 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID
>>> 27, localhost): java.io.FileNotFoundException:
>>> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
>>> (Too many open files)
>>> at java.io.FileOutputStream.open(Native Method)
>>> at java.io.FileOutputStream.(FileOutputStream.java:221)
>>> at
>>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
>>> at
>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
>>> at
>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
>>> at scala.Array$.fill(Array.scala:267)
>>> at
>>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
>>> at
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
>>> at
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>> Thanks!
>>> --
>>>
>>>
>>> Regards.
>>> Miguel Ángel
>>>
>>
>>
>
>
> --
>
>
> Saludos.
> Miguel Ángel
>


Re: Too many open files

2015-03-30 Thread Masf
Hi.

I've done relogin, in fact, I put 'uname -n' and returns 100, but it
crashs.
I'm doing reduceByKey and SparkSQL mixed over 17 files (250MB-500MB/file)


Regards.
Miguel Angel.

On Mon, Mar 30, 2015 at 1:52 PM, Akhil Das 
wrote:

> Mostly, you will have to restart the machines to get the ulimit effect (or
> relogin). What operation are you doing? Are you doing too many
> repartitions?
>
> Thanks
> Best Regards
>
> On Mon, Mar 30, 2015 at 4:52 PM, Masf  wrote:
>
>> Hi
>>
>> I have a problem with temp data in Spark. I have fixed
>> spark.shuffle.manager to "SORT". In /etc/secucity/limits.conf set the next
>> values:
>> *   softnofile  100
>> *   hardnofile  100
>> In spark-env.sh set ulimit -n 100
>> I've restarted the spark service and it continues crashing (Too many open
>> files)
>>
>> How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2
>>
>> java.io.FileNotFoundException:
>> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
>> (Too many open files)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.(FileOutputStream.java:221)
>> at
>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
>> at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
>> at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
>> at scala.Array$.fill(Array.scala:267)
>> at
>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
>> at
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
>> at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID
>> 27, localhost): java.io.FileNotFoundException:
>> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
>> (Too many open files)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.(FileOutputStream.java:221)
>> at
>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
>> at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
>> at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
>> at scala.Array$.fill(Array.scala:267)
>> at
>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
>> at
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
>> at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> Thanks!
>> --
>>
>>
>> Regards.
>> Miguel Ángel
>>
>
>


-- 


Saludos.
Miguel Ángel


Re: Too many open files

2015-03-30 Thread Akhil Das
Mostly, you will have to restart the machines to get the ulimit effect (or
relogin). What operation are you doing? Are you doing too many
repartitions?

Thanks
Best Regards

On Mon, Mar 30, 2015 at 4:52 PM, Masf  wrote:

> Hi
>
> I have a problem with temp data in Spark. I have fixed
> spark.shuffle.manager to "SORT". In /etc/secucity/limits.conf set the next
> values:
> *   softnofile  100
> *   hardnofile  100
> In spark-env.sh set ulimit -n 100
> I've restarted the spark service and it continues crashing (Too many open
> files)
>
> How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2
>
> java.io.FileNotFoundException:
> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
> (Too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
> at scala.Array$.fill(Array.scala:267)
> at
> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID
> 27, localhost): java.io.FileNotFoundException:
> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
> (Too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
> at scala.Array$.fill(Array.scala:267)
> at
> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Thanks!
> --
>
>
> Regards.
> Miguel Ángel
>


Too many open files

2015-03-30 Thread Masf
Hi

I have a problem with temp data in Spark. I have fixed
spark.shuffle.manager to "SORT". In /etc/secucity/limits.conf set the next
values:
*   softnofile  100
*   hardnofile  100
In spark-env.sh set ulimit -n 100
I've restarted the spark service and it continues crashing (Too many open
files)

How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2

java.io.FileNotFoundException:
/tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
(Too many open files)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
at scala.Array$.fill(Array.scala:267)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID 27,
localhost): java.io.FileNotFoundException:
/tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c
(Too many open files)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
at scala.Array$.fill(Array.scala:267)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



Thanks!
-- 


Regards.
Miguel Ángel


RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread Shuai Zheng
Below is the output:

 

core file size  (blocks, -c) 0

data seg size   (kbytes, -d) unlimited

scheduling priority (-e) 0

file size   (blocks, -f) unlimited

pending signals (-i) 1967947

max locked memory   (kbytes, -l) 64

max memory size (kbytes, -m) unlimited

open files  (-n) 2024

pipe size(512 bytes, -p) 8

POSIX message queues (bytes, -q) 819200

real-time priority  (-r) 0

stack size  (kbytes, -s) 8192

cpu time   (seconds, -t) unlimited

max user processes  (-u) 1967947

virtual memory  (kbytes, -v) unlimited

file locks  (-x) unlimited

 

I have set the max open file to 2024 by ulimit -n 2024, but same issue

I am not sure whether it is a reasonable setting.

 

Actually I am doing a loop, each time try to sort only 3GB data, it runs
very quick in first loop, and slow down in second loop. At each time loop I
start and destroy the context (because I want to clean up the temp file
create under tmp folder, which take a lot of space). Just default setting.

 

My logic:

 

For loop:

Val sc = new sc

Sql = sc.loadParquet

Sortbykey

Sc.stop

End

 

And I run on the EC2 c3*8xlarge, Amazon Linux AMI 2014.09.2 (HVM).

 

From: java8964 [mailto:java8...@hotmail.com] 
Sent: Friday, March 20, 2015 3:54 PM
To: user@spark.apache.org
Subject: RE: com.esotericsoftware.kryo.KryoException: java.io.IOException:
File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

 

Do you think the ulimit for the user running Spark on your nodes?

 

Can you run "ulimit -a" under the user who is running spark on the executor
node? Does the result make sense for the data you are trying to process?

 

Yong

 

  _  

From: szheng.c...@gmail.com
To: user@spark.apache.org
Subject: com.esotericsoftware.kryo.KryoException: java.io.IOException: File
too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Date: Fri, 20 Mar 2015 15:28:26 -0400

Hi All,

 

I try to run a simple sort by on 1.2.1. And it always give me below two
errors:

 

1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID
35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException:
/tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6
b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d
d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782
6 (Too many open files)

 

And then I switch to:

conf.set("spark.shuffle.consolidateFiles", "true")

.set("spark.shuffle.manager", "SORT")

 

Then I get the error:

 

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure:
Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal):
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large

at com.esotericsoftware.kryo.io.Output.flush(Output.java:157)

 

I roughly know the first issue is because Spark shuffle creates too many
local temp files (and I don't know the solution, because looks like my
solution also cause other issues), but I am not sure what means is the
second error. 

 

Anyone knows the solution for both cases?

 

Regards,

 

Shuai



RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread java8964
Do you think the ulimit for the user running Spark on your nodes?
Can you run "ulimit -a" under the user who is running spark on the executor 
node? Does the result make sense for the data you are trying to process?
Yong
From: szheng.c...@gmail.com
To: user@spark.apache.org
Subject: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too 
large vs FileNotFoundException (Too many open files) on spark 1.2.1
Date: Fri, 20 Mar 2015 15:28:26 -0400

Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two 
errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 
(TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: 
/tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19dd-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b53267826
 (Too many open files) And then I switch 
to:conf.set("spark.shuffle.consolidateFiles", 
"true").set("spark.shuffle.manager", "SORT") Then I get the error: Exception in 
thread "main" org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 
in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): 
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large
at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly 
know the first issue is because Spark shuffle creates too many local temp files 
(and I don’t know the solution, because looks like my solution also cause other 
issues), but I am not sure what means is the second error.  Anyone knows the 
solution for both cases? Regards, Shuai 
  

Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread Charles Feduke
Assuming you are on Linux, what is your /etc/security/limits.conf set for
nofile/soft (number of open file handles)?

On Fri, Mar 20, 2015 at 3:29 PM Shuai Zheng  wrote:

> Hi All,
>
>
>
> I try to run a simple sort by on 1.2.1. And it always give me below two
> errors:
>
>
>
> 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID
> 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException:
> /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19dd-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b53267826
> (Too many open files)
>
>
>
> And then I switch to:
>
> conf.set("spark.shuffle.consolidateFiles", "true")
>
> .set("spark.shuffle.manager", "SORT")
>
>
>
> Then I get the error:
>
>
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent
> failure: Lost task 5.3 in stage 1.0 (TID 36,
> ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException:
> java.io.IOException: File too large
>
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:157)
>
>
>
> I roughly know the first issue is because Spark shuffle creates too many
> local temp files (and I don’t know the solution, because looks like my
> solution also cause other issues), but I am not sure what means is the
> second error.
>
>
>
> Anyone knows the solution for both cases?
>
>
>
> Regards,
>
>
>
> Shuai
>


com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread Shuai Zheng
Hi All,

 

I try to run a simple sort by on 1.2.1. And it always give me below two
errors:

 

1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID
35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException:
/tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6
b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d
d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782
6 (Too many open files)

 

And then I switch to:

conf.set("spark.shuffle.consolidateFiles", "true")

.set("spark.shuffle.manager", "SORT")

 

Then I get the error:

 

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure:
Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal):
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large

at com.esotericsoftware.kryo.io.Output.flush(Output.java:157)

 

I roughly know the first issue is because Spark shuffle creates too many
local temp files (and I don't know the solution, because looks like my
solution also cause other issues), but I am not sure what means is the
second error. 

 

Anyone knows the solution for both cases?

 

Regards,

 

Shuai



Re: Too many open files

2014-08-29 Thread Ye Xianjin
Ops,the last reply didn't go to the user list.  Mail app's fault.

Shuffling happens in the cluster, so you need change all the nodes in the 
cluster.



Sent from my iPhone

> On 2014年8月30日, at 3:10, Sudha Krishna  wrote:
> 
> Hi,
> 
> Thanks for your response. Do you know if I need to change this limit on all 
> the cluster nodes or just the master?
> Thanks
> 
>> On Aug 29, 2014 11:43 AM, "Ye Xianjin"  wrote:
>> 1024 for the number of file limit is most likely too small for Linux 
>> Machines on production. Try to set to 65536 or unlimited if you can. The too 
>> many open files error occurs because there are a lot of shuffle files(if 
>> wrong, please correct me):
>> 
>> Sent from my iPhone
>> 
>> > On 2014年8月30日, at 2:06, SK  wrote:
>> >
>> > Hi,
>> >
>> > I am having the same problem reported by Michael. I am trying to open 30
>> > files. ulimit -n  shows the limit is 1024. So I am not sure why the program
>> > is failing with  "Too many open files" error. The total size of all the 30
>> > files is 230 GB.
>> > I am running the job on a cluster with 10 nodes, each having 16 GB. The
>> > error appears to be happening at the distinct() stage.
>> >
>> > Here is my program. In the following code, are all the 10 nodes trying to
>> > open all of the 30 files or are the files distributed among the 30 nodes?
>> >
>> >val baseFile = "/mapr/mapr_dir/files_2013apr*"
>> >valx = sc.textFile(baseFile)).map { line =>
>> >val
>> > fields = line.split("\t")
>> >
>> > (fields(11), fields(6))
>> >
>> > }.distinct().countByKey()
>> >val xrdd = sc.parallelize(x.toSeq)
>> >xrdd.saveAsTextFile(...)
>> >
>> > Instead of using the glob *, I guess I can try using a for loop to read the
>> > files one by one if that helps, but not sure if there is a more efficient
>> > solution.
>> >
>> > The following is the error transcript:
>> >
>> > Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent
>> > failure: Exception failure in TID 902 on host 192.168.13.11:
>> > java.io.FileNotFoundException:
>> > /tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open
>> > files)
>> > java.io.FileOutputStream.open(Native Method)
>> > java.io.FileOutputStream.(FileOutputStream.java:221)
>> > org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116)
>> > org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177)
>> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
>> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
>> > scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> > org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159)
>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>> > org.apache.spark.scheduler.Task.run(Task.scala:51)
>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> > java.lang.Thread.run(Thread.java:744) Driver stacktrace:
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context: 
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >


Re: Too many open files

2014-08-29 Thread SK
Hi,

I am having the same problem reported by Michael. I am trying to open 30
files. ulimit -n  shows the limit is 1024. So I am not sure why the program
is failing with  "Too many open files" error. The total size of all the 30
files is 230 GB. 
I am running the job on a cluster with 10 nodes, each having 16 GB. The
error appears to be happening at the distinct() stage.

Here is my program. In the following code, are all the 10 nodes trying to
open all of the 30 files or are the files distributed among the 30 nodes?  

val baseFile = "/mapr/mapr_dir/files_2013apr*"
valx = sc.textFile(baseFile)).map { line =>
val
fields = line.split("\t")

(fields(11), fields(6)) 
  
}.distinct().countByKey()
val xrdd = sc.parallelize(x.toSeq)
xrdd.saveAsTextFile(...) 

Instead of using the glob *, I guess I can try using a for loop to read the
files one by one if that helps, but not sure if there is a more efficient
solution. 

The following is the error transcript: 

Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent
failure: Exception failure in TID 902 on host 192.168.13.11:
java.io.FileNotFoundException:
/tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open
files) 
java.io.FileOutputStream.open(Native Method)
java.io.FileOutputStream.(FileOutputStream.java:221)
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116)
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.Task.run(Task.scala:51)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744) Driver stacktrace:





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sorting data large data- "too many open files" exception

2014-05-26 Thread Mayur Rustagi
set ulimit quite high in root mode & that should resolve it.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Mon, May 26, 2014 at 7:48 PM, Matt Kielo  wrote:

> Hello,
>
> I currently have a task always failing with
> "java.io.FileNotFoundException: [...]/shuffle_0_257_2155 (Too many open
> files)" when I run sorting operations such as distinct, sortByKey, or
> reduceByKey on a large number of partitions.
>
> Im working with 365 GB of data which is being split into 5959 partitions.
> The cluster Im using has over 1000GB of memory with 20GB of memory per node.
>
> I have tried adding .set("spark.shuffle.consolidate.files",  "true") when
> making my spark context but it doesnt seem to make a difference.
>
>  Has anyone else had similar problems?
>
> Best regards,
>
> Matt
>
>


Sorting data large data- "too many open files" exception

2014-05-26 Thread Matt Kielo
Hello,

I currently have a task always failing with "java.io.FileNotFoundException:
[...]/shuffle_0_257_2155 (Too many open files)" when I run sorting
operations such as distinct, sortByKey, or reduceByKey on a large number of
partitions.

Im working with 365 GB of data which is being split into 5959 partitions.
The cluster Im using has over 1000GB of memory with 20GB of memory per node.

I have tried adding .set("spark.shuffle.consolidate.files",  "true") when
making my spark context but it doesnt seem to make a difference.

 Has anyone else had similar problems?

Best regards,

Matt


Re: "Too many open files" exception on reduceByKey

2014-03-11 Thread Matthew Cheah
Sorry, I also have some follow-up questions.

"In general if a node in your cluster has C assigned cores and you run a
job with X reducers then Spark will open C*X files in parallel and start
writing."

Some questions came to mind just now:
1) It would be nice to have a brief overview as to what these files are
being used for?
2) Is this C*X files being opened on each machine? Also, is C the total
number of cores among all machines in the cluster?

Thanks,

-Matt Cheah


On Tue, Mar 11, 2014 at 4:35 PM, Matthew Cheah wrote:

> Thanks. Just curious, is there a default number of reducers that are used?
>
> -Matt Cheah
>
>
> On Mon, Mar 10, 2014 at 7:22 PM, Patrick Wendell wrote:
>
>> Hey Matt,
>>
>> The best way is definitely just to increase the ulimit if possible,
>> this is sort of an assumption we make in Spark that clusters will be
>> able to move it around.
>>
>> You might be able to hack around this by decreasing the number of
>> reducers but this could have some performance implications for your
>> job.
>>
>> In general if a node in your cluster has C assigned cores and you run
>> a job with X reducers then Spark will open C*X files in parallel and
>> start writing. Shuffle consolidation will help decrease the total
>> number of files created but the number of file handles open at any
>> time doesn't change so it won't help the ulimit problem.
>>
>> This means you'll have to use fewer reducers (e.g. pass reduceByKey a
>> number of reducers) or use fewer cores on each machine.
>>
>> - Patrick
>>
>> On Mon, Mar 10, 2014 at 10:41 AM, Matthew Cheah
>>  wrote:
>> > Hi everyone,
>> >
>> > My team (cc'ed in this e-mail) and I are running a Spark reduceByKey
>> > operation on a cluster of 10 slaves where I don't have the privileges
>> to set
>> > "ulimit -n" to a higher number. I'm running on a cluster where "ulimit
>> -n"
>> > returns 1024 on each machine.
>> >
>> > When I attempt to run this job with the data originating from a text
>> file,
>> > stored in an HDFS cluster running on the same nodes as the Spark
>> cluster,
>> > the job crashes with the message, "Too many open files".
>> >
>> > My question is, why are so many files being created, and is there a way
>> to
>> > configure the Spark context to avoid spawning that many files? I am
>> already
>> > setting spark.shuffle.consolidateFiles to true.
>> >
>> > I want to repeat - I can't change the maximum number of open file
>> > descriptors on the machines. This cluster is not owned by me and the
>> system
>> > administrator is responding quite slowly.
>> >
>> > Thanks,
>> >
>> > -Matt Cheah
>>
>
>


Re: "Too many open files" exception on reduceByKey

2014-03-11 Thread Matthew Cheah
Thanks. Just curious, is there a default number of reducers that are used?

-Matt Cheah


On Mon, Mar 10, 2014 at 7:22 PM, Patrick Wendell  wrote:

> Hey Matt,
>
> The best way is definitely just to increase the ulimit if possible,
> this is sort of an assumption we make in Spark that clusters will be
> able to move it around.
>
> You might be able to hack around this by decreasing the number of
> reducers but this could have some performance implications for your
> job.
>
> In general if a node in your cluster has C assigned cores and you run
> a job with X reducers then Spark will open C*X files in parallel and
> start writing. Shuffle consolidation will help decrease the total
> number of files created but the number of file handles open at any
> time doesn't change so it won't help the ulimit problem.
>
> This means you'll have to use fewer reducers (e.g. pass reduceByKey a
> number of reducers) or use fewer cores on each machine.
>
> - Patrick
>
> On Mon, Mar 10, 2014 at 10:41 AM, Matthew Cheah
>  wrote:
> > Hi everyone,
> >
> > My team (cc'ed in this e-mail) and I are running a Spark reduceByKey
> > operation on a cluster of 10 slaves where I don't have the privileges to
> set
> > "ulimit -n" to a higher number. I'm running on a cluster where "ulimit
> -n"
> > returns 1024 on each machine.
> >
> > When I attempt to run this job with the data originating from a text
> file,
> > stored in an HDFS cluster running on the same nodes as the Spark cluster,
> > the job crashes with the message, "Too many open files".
> >
> > My question is, why are so many files being created, and is there a way
> to
> > configure the Spark context to avoid spawning that many files? I am
> already
> > setting spark.shuffle.consolidateFiles to true.
> >
> > I want to repeat - I can't change the maximum number of open file
> > descriptors on the machines. This cluster is not owned by me and the
> system
> > administrator is responding quite slowly.
> >
> > Thanks,
> >
> > -Matt Cheah
>


Re: "Too many open files" exception on reduceByKey

2014-03-10 Thread Patrick Wendell
Hey Matt,

The best way is definitely just to increase the ulimit if possible,
this is sort of an assumption we make in Spark that clusters will be
able to move it around.

You might be able to hack around this by decreasing the number of
reducers but this could have some performance implications for your
job.

In general if a node in your cluster has C assigned cores and you run
a job with X reducers then Spark will open C*X files in parallel and
start writing. Shuffle consolidation will help decrease the total
number of files created but the number of file handles open at any
time doesn't change so it won't help the ulimit problem.

This means you'll have to use fewer reducers (e.g. pass reduceByKey a
number of reducers) or use fewer cores on each machine.

- Patrick

On Mon, Mar 10, 2014 at 10:41 AM, Matthew Cheah
 wrote:
> Hi everyone,
>
> My team (cc'ed in this e-mail) and I are running a Spark reduceByKey
> operation on a cluster of 10 slaves where I don't have the privileges to set
> "ulimit -n" to a higher number. I'm running on a cluster where "ulimit -n"
> returns 1024 on each machine.
>
> When I attempt to run this job with the data originating from a text file,
> stored in an HDFS cluster running on the same nodes as the Spark cluster,
> the job crashes with the message, "Too many open files".
>
> My question is, why are so many files being created, and is there a way to
> configure the Spark context to avoid spawning that many files? I am already
> setting spark.shuffle.consolidateFiles to true.
>
> I want to repeat - I can't change the maximum number of open file
> descriptors on the machines. This cluster is not owned by me and the system
> administrator is responding quite slowly.
>
> Thanks,
>
> -Matt Cheah


"Too many open files" exception on reduceByKey

2014-03-10 Thread Matthew Cheah
Hi everyone,

My team (cc'ed in this e-mail) and I are running a Spark reduceByKey
operation on a cluster of 10 slaves where I don't have the privileges to
set "ulimit -n" to a higher number. I'm running on a cluster where "ulimit
-n" returns 1024 on each machine.

When I attempt to run this job with the data originating from a text file,
stored in an HDFS cluster running on the same nodes as the Spark cluster,
the job crashes with the message, "Too many open files".

My question is, why are so many files being created, and is there a way to
configure the Spark context to avoid spawning that many files? I am already
setting spark.shuffle.consolidateFiles to true.

I want to repeat - I can't change the maximum number of open file
descriptors on the machines. This cluster is not owned by me and the system
administrator is responding quite slowly.

Thanks,

-Matt Cheah


Re: [incubating-0.9.0] Too Many Open Files on Workers

2014-02-26 Thread Rohit Rai
Hello Andy,

This is a problem we have seen in using the CQL Java driver under heavy
ready loads where it is using NIO and is waiting on many pending responses
which causes to many open sockets and hence too many open files. Are you by
any chance using async queries?

I am the maintainer of Calliope... Feel free to mail me directly on any
issues/queries you have working with Calliope, will be glad to assist.

Cheers,
Rohit


*Founder & CEO, **Tuplejump, Inc.*

www.tuplejump.com
*The Data Engineering Platform*


On Fri, Feb 21, 2014 at 3:34 PM, andy petrella wrote:

> MMMmmmh good point !!
>
> Before answering, I tried to use callioppe but I got an issue and since
> the iteration review was near I quickly switched to the datastax driver.
> But I'll get to callioppe soon, with some questions maybe ;-).
>
> Regarding your point (very good one, I've to say), actually I'm creating a
> session and a batch per partitions.
> Now the shamy part... I haven't set any options for the pool :-/. Is there
> some tuning clues? In my case the C* is local (docker image) so maybe
> should i do
> builder.poolingOptions().setMaxConnectionsPerHost(LOCAL, BIGNUMBER)?
>
> The point is, what about this BIGNUMBER... can it be really big? (Sounds
> weird to me, but I don't want to pre-filter options based on feelings).
>
> Thanks for your response
>
> andy
>
> On Fri, Feb 21, 2014 at 10:36 AM, Sourav Chandra <
> sourav.chan...@livestream.com> wrote:
>
>> From stacktrace it looks like you are using datstax cassandra driver and
>> it tried to create cluster.
>>
>> How many connections you are creating in poolingOptions()  i.e. builder.
>> poolingOptions().setMaxConnectionsPerHost(...)
>>
>> Are you creating this per rdd? Might be there are lots of connections
>> created and at last it failed to create any more.
>>
>> Thanks,
>> Sourav
>>
>>
>> On Fri, Feb 21, 2014 at 3:02 PM, andy petrella 
>> wrote:
>>
>>> Hey guyz,
>>>
>>> I've got this issue (see bottom) with Spark, deployed in Standalone mode
>>> on a local docker environment.
>>> I know that I need to raise the ulimit (only 1024 now) but in the
>>> meantime I was just wondering how this could happen.
>>> My gut feeling is because I'm mounting a lot in memory and Spark tries
>>> to dump some RDDs on the FS, and then boom.
>>>
>>> Also, I was wondering if it cannot be a clue that my job is maybe to
>>> eager in memory? How is it something quite normal which such a low ulimit
>>> on workers?
>>>
>>> Thanks a lot (in advance ^^)
>>>
>>> Cheers,
>>> andy
>>>
>>>
>>>
>>> 14/02/21 08:32:15 ERROR Executor: Exception in task ID 472
>>> org.jboss.netty.channel.ChannelException: Failed to create a selector.
>>>  at
>>> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:337)
>>> at
>>> org.jboss.netty.channel.socket.nio.AbstractNioSelector.(AbstractNioSelector.java:95)
>>>  at
>>> org.jboss.netty.channel.socket.nio.AbstractNioWorker.(AbstractNioWorker.java:53)
>>> at org.jboss.netty.channel.socket.nio.NioWorker.(NioWorker.java:45)
>>>  at
>>> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
>>> at
>>> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
>>>  at
>>> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:99)
>>> at
>>> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:69)
>>>  at
>>> org.jboss.netty.channel.socket.nio.NioWorkerPool.(NioWorkerPool.java:39)
>>> at
>>> org.jboss.netty.channel.socket.nio.NioWorkerPool.(NioWorkerPool.java:33)
>>>  at
>>> org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.(NioClientSocketChannelFactory.java:151)
>>> at
>>> org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.(NioClientSocketChannelFactory.java:116)
>>>  at
>>> com.datastax.driver.core.Connection$Factory.(Connection.java:349)
>>> at
>>> com.datastax.driver.core.Connection$Factory.(Connection.java:360)
>>>  at com.datastax.driver.core.Cluster$Manager.(Cluster.java:857)
>>> at com.datastax.driver.core.Cluster$Manager.(Cluster.java:806)
>>>  at com.datastax.driver.core.Cluster.(Cluster.java:76)
>>> at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:132