: Failed to create file system watcher service: User limit of inotify instances reached or too many open files
Hi, When I am doing calculations for example 700 listID's it is saving only some 50 rows and then getting some random exceptions Getting below exception when I try to do calculations on huge data and try to save huge data . Please let me know if any suggestions. Sample Code : I have some lakhs ListID's making group of RDD rdd.groupBy(row => row.getInt(6)). And then using .map doing all ranking calculations Step1 : groupedPartitionRdd = rdd.groupBy(row => row.getInt(6)) Step2 : val outputObjectForGainersAndLosers = groupedPartitionRdd.map(grp => { //ranking logic and some calculations } Step 3 : outputObjectForGainersAndLosers.saveToCassandra("tablename",somecolumns ). Am getting some random exceptions every time in spark- submit not able to debug and facing lot of issues. Caused by: java.lang.RuntimeException: Failed to create file system watcher service: User limit of inotify instances reached or too many open files Caused by: java.io.IOException: User limit of inotify instances reached or too many open files Failed to create file system watcher service: User limit of inotify instances reached or too many open files Error while invoking RpcHandler#receive() for one-way message. org.apache.spark.SparkException: Could not find CoarseGrainedScheduler. at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:154) at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:134) at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:644) at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:178) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107) at org.apache.spark.network.server.TransportChannelHandler.channelRead Thanks, Gopi The information contained in this message is intended only for the recipient, and may be a confidential attorney-client communication or may otherwise be privileged and confidential and protected from disclosure. If the reader of this message is not the intended recipient, or an employee or agent responsible for delivering this message to the intended recipient, please be aware that any dissemination or copying of this communication is strictly prohibited. If you have received this communication in error, please immediately notify us by replying to the message and deleting it from your computer. S&P Global Inc. reserves the right, subject to applicable local law, to monitor, review and process the content of any electronic message or information sent to or from S&P Global Inc. e-mail addresses without informing the sender or recipient of the message. By sending electronic message or information to S&P Global Inc. e-mail addresses you, as the sender, are consenting to S&P Global Inc. processing any of your personal data therein.
Re: [SparkSQL] too many open files although ulimit set to 1048576
I think your sets not works try add `ulimit -n 10240 ` in spark-env.sh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-too-many-open-files-although-ulimit-set-to-1048576-tp28490p28491.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Too many open files, why changing ulimit not effecting?
If you are using systemd, you will need to specify the limit in the service file. I had run into this problem and discovered the solution from the following references: * https://bugzilla.redhat.com/show_bug.cgi?id=754285#c1 * http://serverfault.com/a/678861 On Fri, Feb 5, 2016 at 1:18 PM, Nirav Patel wrote: > For centos there's also /etc/security/limits.d/90-nproc.conf that may > need modifications. > > Services that you expect to use new limits needs to be restarted. Simple > thing to do is to reboot the machine. > > On Fri, Feb 5, 2016 at 3:59 AM, Ted Yu wrote: > >> bq. and *"session required pam_limits.so"*. >> >> What was the second file you modified ? >> >> Did you make the change on all the nodes ? >> >> Please see the verification step in >> https://easyengine.io/tutorials/linux/increase-open-files-limit/ >> >> On Fri, Feb 5, 2016 at 1:42 AM, Mohamed Nadjib MAMI > > wrote: >> >>> Hello all, >>> >>> I'm getting the famous *java.io.FileNotFoundException: ... (Too many >>> open files) *exception. What seemed to have helped people out, it >>> haven't for me. I tried to set the ulimit via the command line *"ulimit >>> -n"*, then I tried to add the following lines to >>> *"/etc/security/limits.conf"* file: >>> >>> ** - nofile 100* >>> *root soft nofile 100* >>> *root hard nofile 100* >>> *hduser soft nofile 100* >>> *hduser hard nofile 100* >>> >>> ...then I added this line *"session required pam_limits.so"* to the two >>> files* "/etc/pam.d/common-session"* and *"session required >>> pam_limits.so"*. The I logged-out/logged-in. First, I tried only the >>> first line (** - nofile 100**)*, then added the 2nd and the 3rd >>> (root...), then added the last two lines (hduser...), no effect. Weirdly >>> enough, when I check with the command *"ulimit -n"* it returns the >>> correct value of 100. >>> >>> I then added *"ulimit -n 100"* to *"spark-env.sh"* in the master >>> and in each of my workers, no effect. >>> >>> What else could it be besides changing the ulimit setting? if it's only >>> that, what could cause Spark to ignore it? >>> >>> I'll appreciate any help in advance. >>> >>> -- >>> *PhD Student - EIS Group - Bonn University, Germany.* >>> *+49 1575 8482232 <%2B49%201575%208482232>* >>> >>> >> > > > > [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> > > <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] > <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] > <https://twitter.com/Xactly> [image: Facebook] > <https://www.facebook.com/XactlyCorp> [image: YouTube] > <http://www.youtube.com/xactlycorporation>
Re: Too many open files, why changing ulimit not effecting?
For centos there's also /etc/security/limits.d/90-nproc.conf that may need modifications. Services that you expect to use new limits needs to be restarted. Simple thing to do is to reboot the machine. On Fri, Feb 5, 2016 at 3:59 AM, Ted Yu wrote: > bq. and *"session required pam_limits.so"*. > > What was the second file you modified ? > > Did you make the change on all the nodes ? > > Please see the verification step in > https://easyengine.io/tutorials/linux/increase-open-files-limit/ > > On Fri, Feb 5, 2016 at 1:42 AM, Mohamed Nadjib MAMI > wrote: > >> Hello all, >> >> I'm getting the famous *java.io.FileNotFoundException: ... (Too many >> open files) *exception. What seemed to have helped people out, it >> haven't for me. I tried to set the ulimit via the command line *"ulimit >> -n"*, then I tried to add the following lines to >> *"/etc/security/limits.conf"* file: >> >> ** - nofile 100* >> *root soft nofile 100* >> *root hard nofile 100* >> *hduser soft nofile 100* >> *hduser hard nofile 100* >> >> ...then I added this line *"session required pam_limits.so"* to the two >> files* "/etc/pam.d/common-session"* and *"session required >> pam_limits.so"*. The I logged-out/logged-in. First, I tried only the >> first line (** - nofile 100**)*, then added the 2nd and the 3rd >> (root...), then added the last two lines (hduser...), no effect. Weirdly >> enough, when I check with the command *"ulimit -n"* it returns the >> correct value of 100. >> >> I then added *"ulimit -n 100"* to *"spark-env.sh"* in the master and >> in each of my workers, no effect. >> >> What else could it be besides changing the ulimit setting? if it's only >> that, what could cause Spark to ignore it? >> >> I'll appreciate any help in advance. >> >> -- >> *PhD Student - EIS Group - Bonn University, Germany.* >> *+49 1575 8482232 <%2B49%201575%208482232>* >> >> > -- [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] <https://twitter.com/Xactly> [image: Facebook] <https://www.facebook.com/XactlyCorp> [image: YouTube] <http://www.youtube.com/xactlycorporation>
Re: Too many open files, why changing ulimit not effecting?
bq. and *"session required pam_limits.so"*. What was the second file you modified ? Did you make the change on all the nodes ? Please see the verification step in https://easyengine.io/tutorials/linux/increase-open-files-limit/ On Fri, Feb 5, 2016 at 1:42 AM, Mohamed Nadjib MAMI wrote: > Hello all, > > I'm getting the famous *java.io.FileNotFoundException: ... (Too many open > files) *exception. What seemed to have helped people out, it haven't for > me. I tried to set the ulimit via the command line *"ulimit -n"*, then I > tried to add the following lines to *"/etc/security/limits.conf"* file: > > ** - nofile 100* > *root soft nofile 100* > *root hard nofile 100* > *hduser soft nofile 100* > *hduser hard nofile 100* > > ...then I added this line *"session required pam_limits.so"* to the two > files* "/etc/pam.d/common-session"* and *"session required pam_limits.so"*. > The I logged-out/logged-in. First, I tried only the first line (** - > nofile 100**)*, then added the 2nd and the 3rd (root...), then added > the last two lines (hduser...), no effect. Weirdly enough, when I check > with the command *"ulimit -n"* it returns the correct value of 100. > > I then added *"ulimit -n 100"* to *"spark-env.sh"* in the master and > in each of my workers, no effect. > > What else could it be besides changing the ulimit setting? if it's only > that, what could cause Spark to ignore it? > > I'll appreciate any help in advance. > > -- > *PhD Student - EIS Group - Bonn University, Germany.* > *+49 1575 8482232 <%2B49%201575%208482232>* > >
Too many open files, why changing ulimit not effecting?
Hello all, I'm getting the famous /java.io.FileNotFoundException: ... (Too many open files) /exception. What seemed to have helped people out, it haven't for me. I tried to set the ulimit via the command line /"ulimit -n"/, then I tried to add the following lines to /"/etc/security/limits.conf"/ file: / //* - nofile 100// //root soft nofile 100// //root hard nofile 100// //hduser soft nofile 100// //hduser hard nofile 100/ ...then I added this line /"session required pam_limits.so"/ to the two files/"/etc/pam.d/common-session"/ and /"session required pam_limits.so"/. The I logged-out/logged-in. First, I tried only the first line (/* - nofile 100//)/, then added the 2nd and the 3rd (root...), then added the last two lines (hduser...), no effect. Weirdly enough, when I check with the command /"ulimit -n"/ it returns the correct value of 100. I then added /"ulimit -n 100"/ to /"spark-env.sh"/ in the master and in each of my workers, no effect. What else could it be besides changing the ulimit setting? if it's only that, what could cause Spark to ignore it? I'll appreciate any help in advance. -- /PhD Student - EIS Group - Bonn University, Germany.// //+49 1575 8482232/
Re: java.io.FileNotFoundException(Too many open files) in Spark streaming
The line of code which I highlighted in the screenshot is within the spark source code. Spark implements sort-based shuffle implementation and the spilled files are merged using the merge sort. Here is the link https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf which would convey the same. On Wed, Jan 6, 2016 at 8:19 PM, Annabel Melongo wrote: > Priya, > > It would be helpful if you put the entire trace log along with your code > to help determine the root cause of the error. > > Thanks > > > On Wednesday, January 6, 2016 4:00 AM, Priya Ch < > learnings.chitt...@gmail.com> wrote: > > > Running 'lsof' will let us know the open files but how do we come to know > the root cause behind opening too many files. > > Thanks, > Padma CH > > On Wed, Jan 6, 2016 at 8:39 AM, Hamel Kothari > wrote: > > The "Too Many Files" part of the exception is just indicative of the fact > that when that call was made, too many files were already open. It doesn't > necessarily mean that that line is the source of all of the open files, > that's just the point at which it hit its limit. > > What I would recommend is to try to run this code again and use "lsof" on > one of the spark executors (perhaps run it in a for loop, writing the > output to separate files) until it fails and see which files are being > opened, if there's anything that seems to be taking up a clear majority > that might key you in on the culprit. > > On Tue, Jan 5, 2016 at 9:48 PM Priya Ch > wrote: > > Yes, the fileinputstream is closed. May be i didn't show in the screen > shot . > > As spark implements, sort-based shuffle, there is a parameter called > maximum merge factor which decides the number of files that can be merged > at once and this avoids too many open files. I am suspecting that it is > something related to this. > > Can someone confirm on this ? > > On Tue, Jan 5, 2016 at 11:19 PM, Annabel Melongo < > melongo_anna...@yahoo.com> wrote: > > Vijay, > > Are you closing the fileinputstream at the end of each loop ( in.close())? > My guess is those streams aren't close and thus the "too many open files" > exception. > > > On Tuesday, January 5, 2016 8:03 AM, Priya Ch < > learnings.chitt...@gmail.com> wrote: > > > Can some one throw light on this ? > > Regards, > Padma Ch > > On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch > wrote: > > Chris, we are using spark 1.3.0 version. we have not set > spark.streaming.concurrentJobs > this parameter. It takes the default value. > > Vijay, > > From the tack trace it is evident that > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730) > is throwing the exception. I opened the spark source code and visited the > line which is throwing this exception i.e > > [image: Inline image 1] > > The lie which is marked in red is throwing the exception. The file is > ExternalSorter.scala in org.apache.spark.util.collection package. > > i went through the following blog > http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/ > and understood that there is merge factor which decide the number of > on-disk files that could be merged. Is it some way related to this ? > > Regards, > Padma CH > > On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly wrote: > > and which version of Spark/Spark Streaming are you using? > > are you explicitly setting the spark.streaming.concurrentJobs to > something larger than the default of 1? > > if so, please try setting that back to 1 and see if the problem still > exists. > > this is a dangerous parameter to modify from the default - which is why > it's not well-documented. > > > On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge > wrote: > > Few indicators - > > 1) during execution time - check total number of open files using lsof > command. Need root permissions. If it is cluster not sure much ! > 2) which exact line in the code is triggering this error ? Can you paste > that snippet ? > > > On Wednesday 23 December 2015, Priya Ch > wrote: > > ulimit -n 65000 > > fs.file-max = 65000 ( in etc/sysctl.conf file) > > Thanks, > Padma Ch > > On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma wrote: > > Could you share the ulimit for your setup please ? > - Thanks, via mobile, excuse brevity. > On Dec 22, 2015 6:39 PM, "Priya Ch" wrote: > > Jakob, > >Increased the settings like fs.file-max in /etc/sysctl.conf and also > increased user limi
Re: java.io.FileNotFoundException(Too many open files) in Spark streaming
Running 'lsof' will let us know the open files but how do we come to know the root cause behind opening too many files. Thanks, Padma CH On Wed, Jan 6, 2016 at 8:39 AM, Hamel Kothari wrote: > The "Too Many Files" part of the exception is just indicative of the fact > that when that call was made, too many files were already open. It doesn't > necessarily mean that that line is the source of all of the open files, > that's just the point at which it hit its limit. > > What I would recommend is to try to run this code again and use "lsof" on > one of the spark executors (perhaps run it in a for loop, writing the > output to separate files) until it fails and see which files are being > opened, if there's anything that seems to be taking up a clear majority > that might key you in on the culprit. > > On Tue, Jan 5, 2016 at 9:48 PM Priya Ch > wrote: > >> Yes, the fileinputstream is closed. May be i didn't show in the screen >> shot . >> >> As spark implements, sort-based shuffle, there is a parameter called >> maximum merge factor which decides the number of files that can be merged >> at once and this avoids too many open files. I am suspecting that it is >> something related to this. >> >> Can someone confirm on this ? >> >> On Tue, Jan 5, 2016 at 11:19 PM, Annabel Melongo < >> melongo_anna...@yahoo.com> wrote: >> >>> Vijay, >>> >>> Are you closing the fileinputstream at the end of each loop ( >>> in.close())? My guess is those streams aren't close and thus the "too many >>> open files" exception. >>> >>> >>> On Tuesday, January 5, 2016 8:03 AM, Priya Ch < >>> learnings.chitt...@gmail.com> wrote: >>> >>> >>> Can some one throw light on this ? >>> >>> Regards, >>> Padma Ch >>> >>> On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch >>> wrote: >>> >>> Chris, we are using spark 1.3.0 version. we have not set >>> spark.streaming.concurrentJobs >>> this parameter. It takes the default value. >>> >>> Vijay, >>> >>> From the tack trace it is evident that >>> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730) >>> is throwing the exception. I opened the spark source code and visited the >>> line which is throwing this exception i.e >>> >>> [image: Inline image 1] >>> >>> The lie which is marked in red is throwing the exception. The file is >>> ExternalSorter.scala in org.apache.spark.util.collection package. >>> >>> i went through the following blog >>> http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/ >>> and understood that there is merge factor which decide the number of >>> on-disk files that could be merged. Is it some way related to this ? >>> >>> Regards, >>> Padma CH >>> >>> On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly wrote: >>> >>> and which version of Spark/Spark Streaming are you using? >>> >>> are you explicitly setting the spark.streaming.concurrentJobs to >>> something larger than the default of 1? >>> >>> if so, please try setting that back to 1 and see if the problem still >>> exists. >>> >>> this is a dangerous parameter to modify from the default - which is why >>> it's not well-documented. >>> >>> >>> On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge >>> wrote: >>> >>> Few indicators - >>> >>> 1) during execution time - check total number of open files using lsof >>> command. Need root permissions. If it is cluster not sure much ! >>> 2) which exact line in the code is triggering this error ? Can you paste >>> that snippet ? >>> >>> >>> On Wednesday 23 December 2015, Priya Ch >>> wrote: >>> >>> ulimit -n 65000 >>> >>> fs.file-max = 65000 ( in etc/sysctl.conf file) >>> >>> Thanks, >>> Padma Ch >>> >>> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma wrote: >>> >>> Could you share the ulimit for your setup please ? >>> - Thanks, via mobile, excuse brevity. >>> On Dec 22, 2015 6:39 PM, "Priya Ch" >>> wrote: >>> >>> Jakob, >>> >>>Increased the settings like fs.file-max in /etc/sysctl.conf and also >>> increased user limit in /etc/security/limits.conf. But still see the >>> same issue. >>> >>> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky >>> wrote: >>> >>> It might be a good idea to see how many files are open and try >>> increasing the open file limit (this is done on an os level). In some >>> application use-cases it is actually a legitimate need. >>> >>> If that doesn't help, make sure you close any unused files and streams >>> in your code. It will also be easier to help diagnose the issue if you send >>> an error-reproducing snippet. >>> >>> >>> >>> >>> >>> -- >>> Regards, >>> Vijay Gharge >>> >>> >>> >>> >>> >>> >>> -- >>> >>> *Chris Fregly* >>> Principal Data Solutions Engineer >>> IBM Spark Technology Center, San Francisco, CA >>> http://spark.tc | http://advancedspark.com >>> >>> >>> >>> >>> >>> >>
Re: java.io.FileNotFoundException(Too many open files) in Spark streaming
Yes, the fileinputstream is closed. May be i didn't show in the screen shot . As spark implements, sort-based shuffle, there is a parameter called maximum merge factor which decides the number of files that can be merged at once and this avoids too many open files. I am suspecting that it is something related to this. Can someone confirm on this ? On Tue, Jan 5, 2016 at 11:19 PM, Annabel Melongo wrote: > Vijay, > > Are you closing the fileinputstream at the end of each loop ( in.close())? > My guess is those streams aren't close and thus the "too many open files" > exception. > > > On Tuesday, January 5, 2016 8:03 AM, Priya Ch < > learnings.chitt...@gmail.com> wrote: > > > Can some one throw light on this ? > > Regards, > Padma Ch > > On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch > wrote: > > Chris, we are using spark 1.3.0 version. we have not set > spark.streaming.concurrentJobs > this parameter. It takes the default value. > > Vijay, > > From the tack trace it is evident that > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730) > is throwing the exception. I opened the spark source code and visited the > line which is throwing this exception i.e > > [image: Inline image 1] > > The lie which is marked in red is throwing the exception. The file is > ExternalSorter.scala in org.apache.spark.util.collection package. > > i went through the following blog > http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/ > and understood that there is merge factor which decide the number of > on-disk files that could be merged. Is it some way related to this ? > > Regards, > Padma CH > > On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly wrote: > > and which version of Spark/Spark Streaming are you using? > > are you explicitly setting the spark.streaming.concurrentJobs to > something larger than the default of 1? > > if so, please try setting that back to 1 and see if the problem still > exists. > > this is a dangerous parameter to modify from the default - which is why > it's not well-documented. > > > On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge > wrote: > > Few indicators - > > 1) during execution time - check total number of open files using lsof > command. Need root permissions. If it is cluster not sure much ! > 2) which exact line in the code is triggering this error ? Can you paste > that snippet ? > > > On Wednesday 23 December 2015, Priya Ch > wrote: > > ulimit -n 65000 > > fs.file-max = 65000 ( in etc/sysctl.conf file) > > Thanks, > Padma Ch > > On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma wrote: > > Could you share the ulimit for your setup please ? > - Thanks, via mobile, excuse brevity. > On Dec 22, 2015 6:39 PM, "Priya Ch" wrote: > > Jakob, > >Increased the settings like fs.file-max in /etc/sysctl.conf and also > increased user limit in /etc/security/limits.conf. But still see the same > issue. > > On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky > wrote: > > It might be a good idea to see how many files are open and try increasing > the open file limit (this is done on an os level). In some application > use-cases it is actually a legitimate need. > > If that doesn't help, make sure you close any unused files and streams in > your code. It will also be easier to help diagnose the issue if you send an > error-reproducing snippet. > > > > > > -- > Regards, > Vijay Gharge > > > > > > > -- > > *Chris Fregly* > Principal Data Solutions Engineer > IBM Spark Technology Center, San Francisco, CA > http://spark.tc | http://advancedspark.com > > > > > >
Re: java.io.FileNotFoundException(Too many open files) in Spark streaming
Vijay, Are you closing the fileinputstream at the end of each loop ( in.close())? My guess is those streams aren't close and thus the "too many open files" exception. On Tuesday, January 5, 2016 8:03 AM, Priya Ch wrote: Can some one throw light on this ? Regards,Padma Ch On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch wrote: Chris, we are using spark 1.3.0 version. we have not set spark.streaming.concurrentJobs this parameter. It takes the default value. Vijay, From the tack trace it is evident that org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730) is throwing the exception. I opened the spark source code and visited the line which is throwing this exception i.e The lie which is marked in red is throwing the exception. The file is ExternalSorter.scala in org.apache.spark.util.collection package. i went through the following blog http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/ and understood that there is merge factor which decide the number of on-disk files that could be merged. Is it some way related to this ? Regards,Padma CH On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly wrote: and which version of Spark/Spark Streaming are you using? are you explicitly setting the spark.streaming.concurrentJobs to something larger than the default of 1? if so, please try setting that back to 1 and see if the problem still exists. this is a dangerous parameter to modify from the default - which is why it's not well-documented. On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge wrote: Few indicators - 1) during execution time - check total number of open files using lsof command. Need root permissions. If it is cluster not sure much !2) which exact line in the code is triggering this error ? Can you paste that snippet ? On Wednesday 23 December 2015, Priya Ch wrote: ulimit -n 65000 fs.file-max = 65000 ( in etc/sysctl.conf file) Thanks,Padma Ch On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma wrote: Could you share the ulimit for your setup please ? - Thanks, via mobile, excuse brevity. On Dec 22, 2015 6:39 PM, "Priya Ch" wrote: Jakob, Increased the settings like fs.file-max in /etc/sysctl.conf and also increased user limit in /etc/security/limits.conf. But still see the same issue. On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky wrote: It might be a good idea to see how many files are open and try increasing the open file limit (this is done on an os level). In some application use-cases it is actually a legitimate need. If that doesn't help, make sure you close any unused files and streams in your code. It will also be easier to help diagnose the issue if you send an error-reproducing snippet. -- Regards,Vijay Gharge -- Chris FreglyPrincipal Data Solutions EngineerIBM Spark Technology Center, San Francisco, CAhttp://spark.tc | http://advancedspark.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.FileNotFoundException(Too many open files) in Spark streaming
Can some one throw light on this ? Regards, Padma Ch On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch wrote: > Chris, we are using spark 1.3.0 version. we have not set > spark.streaming.concurrentJobs > this parameter. It takes the default value. > > Vijay, > > From the tack trace it is evident that > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730) > is throwing the exception. I opened the spark source code and visited the > line which is throwing this exception i.e > > [image: Inline image 1] > > The lie which is marked in red is throwing the exception. The file is > ExternalSorter.scala in org.apache.spark.util.collection package. > > i went through the following blog > http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/ > and understood that there is merge factor which decide the number of > on-disk files that could be merged. Is it some way related to this ? > > Regards, > Padma CH > > On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly wrote: > >> and which version of Spark/Spark Streaming are you using? >> >> are you explicitly setting the spark.streaming.concurrentJobs to >> something larger than the default of 1? >> >> if so, please try setting that back to 1 and see if the problem still >> exists. >> >> this is a dangerous parameter to modify from the default - which is why >> it's not well-documented. >> >> >> On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge >> wrote: >> >>> Few indicators - >>> >>> 1) during execution time - check total number of open files using lsof >>> command. Need root permissions. If it is cluster not sure much ! >>> 2) which exact line in the code is triggering this error ? Can you paste >>> that snippet ? >>> >>> >>> On Wednesday 23 December 2015, Priya Ch >>> wrote: >>> ulimit -n 65000 fs.file-max = 65000 ( in etc/sysctl.conf file) Thanks, Padma Ch On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma wrote: > Could you share the ulimit for your setup please ? > > - Thanks, via mobile, excuse brevity. > On Dec 22, 2015 6:39 PM, "Priya Ch" > wrote: > >> Jakob, >> >>Increased the settings like fs.file-max in /etc/sysctl.conf and >> also increased user limit in /etc/security/limits.conf. But still >> see the same issue. >> >> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky >> wrote: >> >>> It might be a good idea to see how many files are open and try >>> increasing the open file limit (this is done on an os level). In some >>> application use-cases it is actually a legitimate need. >>> >>> If that doesn't help, make sure you close any unused files and >>> streams in your code. It will also be easier to help diagnose the issue >>> if >>> you send an error-reproducing snippet. >>> >> >> >>> >>> -- >>> Regards, >>> Vijay Gharge >>> >>> >>> >>> >> >> >> -- >> >> *Chris Fregly* >> Principal Data Solutions Engineer >> IBM Spark Technology Center, San Francisco, CA >> http://spark.tc | http://advancedspark.com >> > >
Re: java.io.FileNotFoundException(Too many open files) in Spark streaming
Chris, we are using spark 1.3.0 version. we have not set spark.streaming.concurrentJobs this parameter. It takes the default value. Vijay, From the tack trace it is evident that org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730) is throwing the exception. I opened the spark source code and visited the line which is throwing this exception i.e [image: Inline image 1] The lie which is marked in red is throwing the exception. The file is ExternalSorter.scala in org.apache.spark.util.collection package. i went through the following blog http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/ and understood that there is merge factor which decide the number of on-disk files that could be merged. Is it some way related to this ? Regards, Padma CH On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly wrote: > and which version of Spark/Spark Streaming are you using? > > are you explicitly setting the spark.streaming.concurrentJobs to > something larger than the default of 1? > > if so, please try setting that back to 1 and see if the problem still > exists. > > this is a dangerous parameter to modify from the default - which is why > it's not well-documented. > > > On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge > wrote: > >> Few indicators - >> >> 1) during execution time - check total number of open files using lsof >> command. Need root permissions. If it is cluster not sure much ! >> 2) which exact line in the code is triggering this error ? Can you paste >> that snippet ? >> >> >> On Wednesday 23 December 2015, Priya Ch >> wrote: >> >>> ulimit -n 65000 >>> >>> fs.file-max = 65000 ( in etc/sysctl.conf file) >>> >>> Thanks, >>> Padma Ch >>> >>> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma wrote: >>> Could you share the ulimit for your setup please ? - Thanks, via mobile, excuse brevity. On Dec 22, 2015 6:39 PM, "Priya Ch" wrote: > Jakob, > >Increased the settings like fs.file-max in /etc/sysctl.conf and > also increased user limit in /etc/security/limits.conf. But still see > the same issue. > > On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky > wrote: > >> It might be a good idea to see how many files are open and try >> increasing the open file limit (this is done on an os level). In some >> application use-cases it is actually a legitimate need. >> >> If that doesn't help, make sure you close any unused files and >> streams in your code. It will also be easier to help diagnose the issue >> if >> you send an error-reproducing snippet. >> > > >>> >> >> -- >> Regards, >> Vijay Gharge >> >> >> >> > > > -- > > *Chris Fregly* > Principal Data Solutions Engineer > IBM Spark Technology Center, San Francisco, CA > http://spark.tc | http://advancedspark.com >
Re: java.io.FileNotFoundException(Too many open files) in Spark streaming
and which version of Spark/Spark Streaming are you using? are you explicitly setting the spark.streaming.concurrentJobs to something larger than the default of 1? if so, please try setting that back to 1 and see if the problem still exists. this is a dangerous parameter to modify from the default - which is why it's not well-documented. On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge wrote: > Few indicators - > > 1) during execution time - check total number of open files using lsof > command. Need root permissions. If it is cluster not sure much ! > 2) which exact line in the code is triggering this error ? Can you paste > that snippet ? > > > On Wednesday 23 December 2015, Priya Ch > wrote: > >> ulimit -n 65000 >> >> fs.file-max = 65000 ( in etc/sysctl.conf file) >> >> Thanks, >> Padma Ch >> >> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma wrote: >> >>> Could you share the ulimit for your setup please ? >>> >>> - Thanks, via mobile, excuse brevity. >>> On Dec 22, 2015 6:39 PM, "Priya Ch" >>> wrote: >>> Jakob, Increased the settings like fs.file-max in /etc/sysctl.conf and also increased user limit in /etc/security/limits.conf. But still see the same issue. On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky wrote: > It might be a good idea to see how many files are open and try > increasing the open file limit (this is done on an os level). In some > application use-cases it is actually a legitimate need. > > If that doesn't help, make sure you close any unused files and streams > in your code. It will also be easier to help diagnose the issue if you > send > an error-reproducing snippet. > >> > > -- > Regards, > Vijay Gharge > > > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
java.io.FileNotFoundException(Too many open files) in Spark streaming
Few indicators - 1) during execution time - check total number of open files using lsof command. Need root permissions. If it is cluster not sure much ! 2) which exact line in the code is triggering this error ? Can you paste that snippet ? On Wednesday 23 December 2015, Priya Ch > wrote: > ulimit -n 65000 > > fs.file-max = 65000 ( in etc/sysctl.conf file) > > Thanks, > Padma Ch > > On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma wrote: > >> Could you share the ulimit for your setup please ? >> >> - Thanks, via mobile, excuse brevity. >> On Dec 22, 2015 6:39 PM, "Priya Ch" wrote: >> >>> Jakob, >>> >>>Increased the settings like fs.file-max in /etc/sysctl.conf and also >>> increased user limit in /etc/security/limits.conf. But still see the >>> same issue. >>> >>> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky >>> wrote: >>> It might be a good idea to see how many files are open and try increasing the open file limit (this is done on an os level). In some application use-cases it is actually a legitimate need. If that doesn't help, make sure you close any unused files and streams in your code. It will also be easier to help diagnose the issue if you send an error-reproducing snippet. >>> >>> > -- Regards, Vijay Gharge
Re: java.io.FileNotFoundException(Too many open files) in Spark streaming
ulimit -n 65000 fs.file-max = 65000 ( in etc/sysctl.conf file) Thanks, Padma Ch On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma wrote: > Could you share the ulimit for your setup please ? > > - Thanks, via mobile, excuse brevity. > On Dec 22, 2015 6:39 PM, "Priya Ch" wrote: > >> Jakob, >> >>Increased the settings like fs.file-max in /etc/sysctl.conf and also >> increased user limit in /etc/security/limits.conf. But still see the >> same issue. >> >> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky >> wrote: >> >>> It might be a good idea to see how many files are open and try >>> increasing the open file limit (this is done on an os level). In some >>> application use-cases it is actually a legitimate need. >>> >>> If that doesn't help, make sure you close any unused files and streams >>> in your code. It will also be easier to help diagnose the issue if you send >>> an error-reproducing snippet. >>> >> >>
Re: java.io.FileNotFoundException(Too many open files) in Spark streaming
Could you share the ulimit for your setup please ? - Thanks, via mobile, excuse brevity. On Dec 22, 2015 6:39 PM, "Priya Ch" wrote: > Jakob, > >Increased the settings like fs.file-max in /etc/sysctl.conf and also > increased user limit in /etc/security/limits.conf. But still see the same > issue. > > On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky > wrote: > >> It might be a good idea to see how many files are open and try increasing >> the open file limit (this is done on an os level). In some application >> use-cases it is actually a legitimate need. >> >> If that doesn't help, make sure you close any unused files and streams in >> your code. It will also be easier to help diagnose the issue if you send an >> error-reproducing snippet. >> > >
Re: java.io.FileNotFoundException(Too many open files) in Spark streaming
Jakob, Increased the settings like fs.file-max in /etc/sysctl.conf and also increased user limit in /etc/security/limits.conf. But still see the same issue. On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky wrote: > It might be a good idea to see how many files are open and try increasing > the open file limit (this is done on an os level). In some application > use-cases it is actually a legitimate need. > > If that doesn't help, make sure you close any unused files and streams in > your code. It will also be easier to help diagnose the issue if you send an > error-reproducing snippet. >
Re: java.io.FileNotFoundException(Too many open files) in Spark streaming
It might be a good idea to see how many files are open and try increasing the open file limit (this is done on an os level). In some application use-cases it is actually a legitimate need. If that doesn't help, make sure you close any unused files and streams in your code. It will also be easier to help diagnose the issue if you send an error-reproducing snippet.
java.io.FileNotFoundException(Too many open files) in Spark streaming
Hi All, When running streaming application, I am seeing the below error: java.io.FileNotFoundException: /data1/yarn/nm/usercache/root/appcache/application_1450172646510_0004/blockmgr-a81f42cd-6b52-4704-83f3-2cfc12a11b86/02/temp_shuffle_589ddccf-d436-4d2c-9935-e5f8c137b54b (Too many open files) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.(FileInputStream.java:146) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:729) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) It looks like the issue is because in a multi-threaded application, there are too many file handlers and this has reached maximum number of file handles. Regards, Padma Ch
Re: "Too many open files" exception on reduceByKey
It turns out the mesos can overwrite the OS ulimit -n setting. So we have increased the mesos slave ulimit -n setting. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p25019.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: "Too many open files" exception on reduceByKey
You are right, I did find that mesos overwrite this to a smaller number.So we will modify that and try to run again. Thanks! Tian On Thursday, October 8, 2015 4:18 PM, DB Tsai wrote: Try to run to see actual ulimit. We found that mesos overrides the ulimit which causes the issue. import sys.process._ val p = 1 to 100 val rdd = sc.parallelize(p, 100) val a = rdd.map(x=> Seq("sh", "-c", "ulimit -n").!!.toDouble.toLong).collect Sincerely, DB Tsai --Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Thu, Oct 8, 2015 at 3:22 PM, Tian Zhang wrote: I hit this issue with spark 1.3.0 stateful application (with updateStateByKey) function on mesos. It will fail after running fine for about 24 hours. The error stack trace as below, I checked ulimit -n and we have very large numbers set on the machines. What else can be wrong? 15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 113727.0 (TID 833758, ip-10-112-10-221.ec2.internal): java.io.FileNotFoundException: /media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index (Too many open files) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: "Too many open files" exception on reduceByKey
Try to run to see actual ulimit. We found that mesos overrides the ulimit which causes the issue. import sys.process._ val p = 1 to 100 val rdd = sc.parallelize(p, 100) val a = rdd.map(x=> Seq("sh", "-c", "ulimit -n").!!.toDouble.toLong).collect Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D <https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D> On Thu, Oct 8, 2015 at 3:22 PM, Tian Zhang wrote: > I hit this issue with spark 1.3.0 stateful application (with > updateStateByKey) function on mesos. It will > fail after running fine for about 24 hours. > The error stack trace as below, I checked ulimit -n and we have very large > numbers set on the machines. > What else can be wrong? > 15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage > 113727.0 (TID 833758, ip-10-112-10-221.ec2.internal): > java.io.FileNotFoundException: > > /media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index > (Too many open files) > at java.io.FileOutputStream.open(Native Method) > at java.io.FileOutputStream.(FileOutputStream.java:221) > at java.io.FileOutputStream.(FileOutputStream.java:171) > at > > org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85) > at > > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: "Too many open files" exception on reduceByKey
I hit this issue with spark 1.3.0 stateful application (with updateStateByKey) function on mesos. It will fail after running fine for about 24 hours. The error stack trace as below, I checked ulimit -n and we have very large numbers set on the machines. What else can be wrong? 15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 113727.0 (TID 833758, ip-10-112-10-221.ec2.internal): java.io.FileNotFoundException: /media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index (Too many open files) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Too many open files issue
I don't think that is the issue. I have it setup to run in a thread pool but I have set the pool size to 1 for this test until I get this resolved. I am having some problems with using the Spark web portal since it is picking a random port and with the way my environment is setup, by time I have figured out which port it is using the job has finished. But what I did do is add some logging and I added collecting the RDD record count to make sure the last logging statements were in fact executed after the RDD process ran. I added the logging statements in the job flow: val directories = findModifiedFiles() directories.foreach(directory => { log 'Starting directory processor for $directory' rdd = sparkContext.newAPIHadoopFile(directory) .filter(...) .map(...) .reduceByKey(...) rdd.foreachPartition(iterator => { iterator.foreach(tuple => { // send data to kafka } } val count = rdd.count log 'Processed $count records for $directory' log 'Finished directory processor for $directory' } This results in these log lines until the "Too many open files in system" errors started happening after which it only printed the first log line for each iteration (as expected since it's throwing an exception). Starting directory processor for /user/oozie/topics/testtopic/hourly/2015/04/15 Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/15 Finished directory processor for /user/oozie/topics/testtopic/hourly/2015/04/15 Starting directory processor for /user/oozie/topics/testtopic/hourly/2015/04/16 Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/16 Finished directory processor for /user/oozie/topics/testtopic/hourly/2015/04/16 Starting directory processor for /user/oozie/topics/testtopic/hourly/2015/04/17 Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/17 Finished directory processor for /user/oozie/topics/testtopic/hourly/2015/04/17 Starting directory processor for /user/oozie/topics/testtopic/hourly/2015/04/18 Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/18 Finished directory processor for /user/oozie/topics/testtopic/hourly/2015/04/18 Starting directory processor for /user/oozie/topics/testtopic/hourly/2015/04/19 Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/19 Finished directory processor for /user/oozie/topics/testtopic/hourly/2015/04/19 Starting directory processor for /user/oozie/topics/testtopic/hourly/2015/04/20 Just to see what would happen if I made the job run slower and had GC run more frequently (in case it had something to do with that), I added the following to each loop iteration: System.gc() Thread.sleep(5000) But besides making the job run a lot longer it did not change anything. Sigurd On Wed, Sep 2, 2015 at 9:40 AM, Saisai Shao wrote: > Here is the code in which NewHadoopRDD register close handler and be > called when the task is completed ( > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136 > ). > > From my understanding, possibly the reason is that this `foreach` code in > your implementation is not executed Spark job one by one in loop as > expected, on the contrary all the jobs are submitted to the DAGScheduler > simultaneously, since each job has no dependency to others, Spark's > scheduler will unwrap the loop and submit jobs in parallelism, so maybe > several map stages are running and pending, this makes your node out of > file handler. > > You could check Spark web portal to see if there's several map stages > running simultaneously, or some of them are running while others are > pending. > > Thanks > Jerry > > > On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg > wrote: > >> Yep. I know. It's was set to 32K when I ran this test. If I bump it to >> 64K the issue goes away. It still doesn't make sense to me that the Spark >> job doesn't release its file handles until the end of the job instead of >> doing that while my loop iterates. >> >> Sigurd >> >> On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran >> wrote: >> >>> >>> On 31 Aug 2015, at 19:49, Sigurd Knippenberg >>> wrote: >>> >>> I know I can adjust the max open files allowed by the OS but I'd rather >>> fix the underlaying issue. >>> >>> >>> >>> bumping up the OS handle limits is step #1 of installing a hadoop cluster >>> >>> https://wiki.apache.org/hadoop/TooManyOpenFiles >>> >> >> >
Re: Too many open files issue
Here is the code in which NewHadoopRDD register close handler and be called when the task is completed ( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136 ). >From my understanding, possibly the reason is that this `foreach` code in your implementation is not executed Spark job one by one in loop as expected, on the contrary all the jobs are submitted to the DAGScheduler simultaneously, since each job has no dependency to others, Spark's scheduler will unwrap the loop and submit jobs in parallelism, so maybe several map stages are running and pending, this makes your node out of file handler. You could check Spark web portal to see if there's several map stages running simultaneously, or some of them are running while others are pending. Thanks Jerry On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg wrote: > Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K > the issue goes away. It still doesn't make sense to me that the Spark job > doesn't release its file handles until the end of the job instead of doing > that while my loop iterates. > > Sigurd > > On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran > wrote: > >> >> On 31 Aug 2015, at 19:49, Sigurd Knippenberg >> wrote: >> >> I know I can adjust the max open files allowed by the OS but I'd rather >> fix the underlaying issue. >> >> >> >> bumping up the OS handle limits is step #1 of installing a hadoop cluster >> >> https://wiki.apache.org/hadoop/TooManyOpenFiles >> > >
Re: Too many open files issue
ah, now that does sound suspicious... On 2 Sep 2015, at 14:09, Sigurd Knippenberg mailto:sig...@knippenberg.com>> wrote: Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K the issue goes away. It still doesn't make sense to me that the Spark job doesn't release its file handles until the end of the job instead of doing that while my loop iterates. Sigurd On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran mailto:ste...@hortonworks.com>> wrote: On 31 Aug 2015, at 19:49, Sigurd Knippenberg mailto:sig...@knippenberg.com>> wrote: I know I can adjust the max open files allowed by the OS but I'd rather fix the underlaying issue. bumping up the OS handle limits is step #1 of installing a hadoop cluster https://wiki.apache.org/hadoop/TooManyOpenFiles
Re: Too many open files issue
Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K the issue goes away. It still doesn't make sense to me that the Spark job doesn't release its file handles until the end of the job instead of doing that while my loop iterates. Sigurd On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran wrote: > > On 31 Aug 2015, at 19:49, Sigurd Knippenberg > wrote: > > I know I can adjust the max open files allowed by the OS but I'd rather > fix the underlaying issue. > > > > bumping up the OS handle limits is step #1 of installing a hadoop cluster > > https://wiki.apache.org/hadoop/TooManyOpenFiles >
Re: Too many open files issue
On 31 Aug 2015, at 19:49, Sigurd Knippenberg mailto:sig...@knippenberg.com>> wrote: I know I can adjust the max open files allowed by the OS but I'd rather fix the underlaying issue. bumping up the OS handle limits is step #1 of installing a hadoop cluster https://wiki.apache.org/hadoop/TooManyOpenFiles
Too many open files issue
I am running in a 'too many open files' issue and before I posted this I have searched on the web to see if anyone had a solution already to my particular problem but I did not see anything that helped. I know I can adjust the max open files allowed by the OS but I'd rather fix the underlaying issue. In HDFS I have a directory that contains hourly files in a directory structure like this: directoryname/hourly/2015/06/02/13 (where the numbers at the end are the date in /MM/DD/HH format). I have a Spark job that roughly does the following: val directories = findModifiedFiles() directories.foreach(directory => { sparkContext.newAPIHadoopFile(directory) .filter(...) .map(...) .reduceByKey(...) .foreachPartition(iterator => { iterator.foreach(tuple => { // send data to kafka } } } If there are only a few directories that have been modified then this works pretty well. But when I have the job reprocess all the data (I have 350M of test data that pretty much has data for each hour of each day for a full year) I run out of file handles. I executed the test on a test cluster of 2 hadoop slave nodes that each have the HDFS data node and yarn node manager running. When I run "lsof -p" on the Spark processes, I see a lot of the following types of open files: java21196 yarn 3268r REG 8,16 139 533320 /hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir17/blk_1073746324_5500.meta java21196 yarn 3269r REG 8,16 15004 533515 /hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir17/blk_1073746422 java21196 yarn 3270r REG 8,16 127 533516 /hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir17/blk_1073746422_5598.meta java21196 yarn 3271r REG 8,16 15583 534081 /hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir19/blk_1073746704 java21196 yarn 3272r REG 8,16 131 534082 /hadoop/hdfs/data/current/BP-479153573-10.240.60.21-1441026862238/current/finalized/subdir0/subdir19/blk_1073746704_5880.meta When I watch the process run, I can see that the number of those open files is ever increasing while it is processing directories until it runs out of file handles (it then drops to zero and starts up again until it runs out again but that is due to the fact that Yarn retries running the job). It basically ends up opening about 4500 file handles to those files per node. As I said, I know that I can increase the number of open file handles, and I will do that, but in my opinion it should not be ever increasing. I would have thought that when I was done with an RDD that Spark would close all the resources that it opened for them (so that it would close the file handles after each execution of the directories.foreach loop). I looked if there was a close() method or something like that for the RDD but couldn't find that. Am I doing something that is causing Spark not to close the file handles? Should I write this job differently? Thanks, Sigurd
RE: Too many open files
Maybe you forgot Tod close a reader Ort writer object. Am 29. Juli 2015 18:04:59 MESZ, schrieb saif.a.ell...@wellsfargo.com: >Thank you both, I will take a look, but > > >1. For high-shuffle tasks, is this right for the system to have >the size and thresholds high? I hope there is no bad consequences. > >2. I will try to overlook admin access and see if I can get >anything with only user rights > >From: Ted Yu [mailto:yuzhih...@gmail.com] >Sent: Wednesday, July 29, 2015 12:59 PM >To: Ellafi, Saif A. >Cc: >Subject: Re: Too many open files > >Please increase limit for open files: > >http://stackoverflow.com/questions/34588/how-do-i-change-the-number-of-open-files-limit-in-linux > > >On Jul 29, 2015, at 8:39 AM, >mailto:saif.a.ell...@wellsfargo.com>> >mailto:saif.a.ell...@wellsfargo.com>> >wrote: >Hello, > >I’ve seen a couple emails on this issue but could not find anything to >solve my situation. > >Tried to reduce the partitioning level, enable consolidateFiles and >increase the sizeInFlight limit, but still no help. Spill manager is >sort, which is the default, any advice? > >15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 >(TID 331, localhost): FetchFailed(BlockManagerId(driver, localhost, >43437), shuffleId=3, mapId=0, reduceId=34, message= >org.apache.spark.shuffle.FetchFailedException: >/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index >(Too many open files) >.. >.. >15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in >stage 11.0 (TID 306) >org.apache.spark.SparkException: Job aborted due to stage failure: Task >20 in stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in >stage 11.0 (TID 317, localhost): java.io.FileNotFoundException: >/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc >(Too many open files) > >my fs is ext4 and currently ulist –n is 1024 > >Thanks >Saif -- Diese Nachricht wurde von meinem Android-Mobiltelefon mit K-9 Mail gesendet.
RE: Too many open files
Thank you both, I will take a look, but 1. For high-shuffle tasks, is this right for the system to have the size and thresholds high? I hope there is no bad consequences. 2. I will try to overlook admin access and see if I can get anything with only user rights From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Wednesday, July 29, 2015 12:59 PM To: Ellafi, Saif A. Cc: Subject: Re: Too many open files Please increase limit for open files: http://stackoverflow.com/questions/34588/how-do-i-change-the-number-of-open-files-limit-in-linux On Jul 29, 2015, at 8:39 AM, mailto:saif.a.ell...@wellsfargo.com>> mailto:saif.a.ell...@wellsfargo.com>> wrote: Hello, I’ve seen a couple emails on this issue but could not find anything to solve my situation. Tried to reduce the partitioning level, enable consolidateFiles and increase the sizeInFlight limit, but still no help. Spill manager is sort, which is the default, any advice? 15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID 331, localhost): FetchFailed(BlockManagerId(driver, localhost, 43437), shuffleId=3, mapId=0, reduceId=34, message= org.apache.spark.shuffle.FetchFailedException: /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index (Too many open files) .. .. 15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in stage 11.0 (TID 306) org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage 11.0 (TID 317, localhost): java.io.FileNotFoundException: /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc (Too many open files) my fs is ext4 and currently ulist –n is 1024 Thanks Saif
Re: Too many open files
Please increase limit for open files: http://stackoverflow.com/questions/34588/how-do-i-change-the-number-of-open-files-limit-in-linux > On Jul 29, 2015, at 8:39 AM, > wrote: > > Hello, > > I’ve seen a couple emails on this issue but could not find anything to solve > my situation. > > Tried to reduce the partitioning level, enable consolidateFiles and increase > the sizeInFlight limit, but still no help. Spill manager is sort, which is > the default, any advice? > > 15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID 331, > localhost): FetchFailed(BlockManagerId(driver, localhost, 43437), > shuffleId=3, mapId=0, reduceId=34, message= > org.apache.spark.shuffle.FetchFailedException: > /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index > (Too many open files) > .. > .. > 15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in stage > 11.0 (TID 306) > org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in > stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage 11.0 > (TID 317, localhost): java.io.FileNotFoundException: > /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc > (Too many open files) > > my fs is ext4 and currently ulist –n is 1024 > > Thanks > Saif >
Re: Too many open files
you probably should increase file handles limit for user that all processes are running with(spark master & workers) e.g. http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/ On 29 July 2015 at 18:39, wrote: > Hello, > > I’ve seen a couple emails on this issue but could not find anything to > solve my situation. > > Tried to reduce the partitioning level, enable consolidateFiles and > increase the sizeInFlight limit, but still no help. Spill manager is sort, > which is the default, any advice? > > 15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID > 331, localhost): FetchFailed(BlockManagerId(driver, localhost, 43437), > shuffleId=3, mapId=0, reduceId=34, message= > org.apache.spark.shuffle.FetchFailedException: > /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index > (Too many open files) > .. > .. > 15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in > stage 11.0 (TID 306) > org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 > in stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage > 11.0 (TID 317, localhost): java.io.FileNotFoundException: > /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc > (Too many open files) > > my fs is ext4 and currently ulist –n is 1024 > > Thanks > Saif > >
Too many open files
Hello, I've seen a couple emails on this issue but could not find anything to solve my situation. Tried to reduce the partitioning level, enable consolidateFiles and increase the sizeInFlight limit, but still no help. Spill manager is sort, which is the default, any advice? 15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID 331, localhost): FetchFailed(BlockManagerId(driver, localhost, 43437), shuffleId=3, mapId=0, reduceId=34, message= org.apache.spark.shuffle.FetchFailedException: /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index (Too many open files) .. .. 15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in stage 11.0 (TID 306) org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage 11.0 (TID 317, localhost): java.io.FileNotFoundException: /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc (Too many open files) my fs is ext4 and currently ulist -n is 1024 Thanks Saif
Re: Too many open files when using Spark to consume messages from Kafka
I terminated the old job and now start a new one. Currently, the Spark streaming job has been running for 2 hours and when I use lsof, I do not see many files related to the Spark job. BTW, I am running Spark streaming using local[2] mode. The batch is 5 seconds and it has around 50 lines to read each batch. On Thu, Apr 30, 2015 at 11:15 AM, Cody Koeninger wrote: > Did you use lsof to see what files were opened during the job? > > On Thu, Apr 30, 2015 at 1:05 PM, Bill Jay > wrote: > >> The data ingestion is in outermost portion in foreachRDD block. Although >> now I close the statement of jdbc, the same exception happened again. It >> seems it is not related to the data ingestion part. >> >> On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger >> wrote: >> >>> Use lsof to see what files are actually being held open. >>> >>> That stacktrace looks to me like it's from the driver, not executors. >>> Where in foreach is it being called? The outermost portion of foreachRDD >>> runs in the driver, the innermost portion runs in the executors. From the >>> docs: >>> >>> https://spark.apache.org/docs/latest/streaming-programming-guide.html >>> >>> dstream.foreachRDD { rdd => >>> val connection = createNewConnection() // executed at the driver >>> rdd.foreach { record => >>> connection.send(record) // executed at the worker >>> }} >>> >>> >>> @td I've specifically looked at kafka socket connections for the >>> standard 1.3 code vs my branch that has cached connections. The standard >>> non-caching code has very short lived connections. I've had jobs running >>> for a month at a time, including ones writing to mysql. Not saying it's >>> impossible, but I'd think we need some evidence before speculating this has >>> anything to do with it. >>> >>> >>> On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay >>> wrote: >>> >>>> This function is called in foreachRDD. I think it should be running in >>>> the executors. I add the statement.close() in the code and it is running. I >>>> will let you know if this fixes the issue. >>>> >>>> >>>> >>>> On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das >>>> wrote: >>>> >>>>> Is the function ingestToMysql running on the driver or on the >>>>> executors? Accordingly you can try debugging while running in a >>>>> distributed >>>>> manner, with and without calling the function. >>>>> >>>>> If you dont get "too many open files" without calling ingestToMysql(), >>>>> the problem is likely to be in ingestToMysql(). >>>>> If you get the problem even without calling ingestToMysql(), then the >>>>> problem may be in Kafka. If the problem is occuring in the driver, then >>>>> its >>>>> the DirecKafkaInputDStream code. If the problem is occurring in the >>>>> executor, then the problem is in KafkaRDD. >>>>> >>>>> TD >>>>> >>>>> On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu wrote: >>>>> >>>>>> Maybe add statement.close() in finally block ? >>>>>> >>>>>> Streaming / Kafka experts may have better insight. >>>>>> >>>>>> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay >>>>> > wrote: >>>>>> >>>>>>> Thanks for the suggestion. I ran the command and the limit is 1024. >>>>>>> >>>>>>> Based on my understanding, the connector to Kafka should not open so >>>>>>> many files. Do you think there is possible socket leakage? BTW, in every >>>>>>> batch which is 5 seconds, I output some results to mysql: >>>>>>> >>>>>>> def ingestToMysql(data: Array[String]) { >>>>>>> val url = >>>>>>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123" >>>>>>> var sql = "insert into loggingserver1 values " >>>>>>> data.foreach(line => sql += line) >>>>>>> sql = sql.dropRight(1) >>>>>>> sql += ";" >>>>>>> logger.info(sql) >>>>>>> var conn: java.sql.Connection = null >>>>>>> try { >>>>>
Re: Too many open files when using Spark to consume messages from Kafka
Did you use lsof to see what files were opened during the job? On Thu, Apr 30, 2015 at 1:05 PM, Bill Jay wrote: > The data ingestion is in outermost portion in foreachRDD block. Although > now I close the statement of jdbc, the same exception happened again. It > seems it is not related to the data ingestion part. > > On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger > wrote: > >> Use lsof to see what files are actually being held open. >> >> That stacktrace looks to me like it's from the driver, not executors. >> Where in foreach is it being called? The outermost portion of foreachRDD >> runs in the driver, the innermost portion runs in the executors. From the >> docs: >> >> https://spark.apache.org/docs/latest/streaming-programming-guide.html >> >> dstream.foreachRDD { rdd => >> val connection = createNewConnection() // executed at the driver >> rdd.foreach { record => >> connection.send(record) // executed at the worker >> }} >> >> >> @td I've specifically looked at kafka socket connections for the standard >> 1.3 code vs my branch that has cached connections. The standard >> non-caching code has very short lived connections. I've had jobs running >> for a month at a time, including ones writing to mysql. Not saying it's >> impossible, but I'd think we need some evidence before speculating this has >> anything to do with it. >> >> >> On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay >> wrote: >> >>> This function is called in foreachRDD. I think it should be running in >>> the executors. I add the statement.close() in the code and it is running. I >>> will let you know if this fixes the issue. >>> >>> >>> >>> On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das >>> wrote: >>> >>>> Is the function ingestToMysql running on the driver or on the >>>> executors? Accordingly you can try debugging while running in a distributed >>>> manner, with and without calling the function. >>>> >>>> If you dont get "too many open files" without calling ingestToMysql(), >>>> the problem is likely to be in ingestToMysql(). >>>> If you get the problem even without calling ingestToMysql(), then the >>>> problem may be in Kafka. If the problem is occuring in the driver, then its >>>> the DirecKafkaInputDStream code. If the problem is occurring in the >>>> executor, then the problem is in KafkaRDD. >>>> >>>> TD >>>> >>>> On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu wrote: >>>> >>>>> Maybe add statement.close() in finally block ? >>>>> >>>>> Streaming / Kafka experts may have better insight. >>>>> >>>>> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay >>>>> wrote: >>>>> >>>>>> Thanks for the suggestion. I ran the command and the limit is 1024. >>>>>> >>>>>> Based on my understanding, the connector to Kafka should not open so >>>>>> many files. Do you think there is possible socket leakage? BTW, in every >>>>>> batch which is 5 seconds, I output some results to mysql: >>>>>> >>>>>> def ingestToMysql(data: Array[String]) { >>>>>> val url = >>>>>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123" >>>>>> var sql = "insert into loggingserver1 values " >>>>>> data.foreach(line => sql += line) >>>>>> sql = sql.dropRight(1) >>>>>> sql += ";" >>>>>> logger.info(sql) >>>>>> var conn: java.sql.Connection = null >>>>>> try { >>>>>> conn = DriverManager.getConnection(url) >>>>>> val statement = conn.createStatement() >>>>>> statement.executeUpdate(sql) >>>>>> } catch { >>>>>> case e: Exception => logger.error(e.getMessage()) >>>>>> } finally { >>>>>> if (conn != null) { >>>>>> conn.close >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> I am not sure whether the leakage originates from Kafka connector or >>>>>> the sql connections. >>>>>> >>>>>> Bi
Re: Too many open files when using Spark to consume messages from Kafka
The data ingestion is in outermost portion in foreachRDD block. Although now I close the statement of jdbc, the same exception happened again. It seems it is not related to the data ingestion part. On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger wrote: > Use lsof to see what files are actually being held open. > > That stacktrace looks to me like it's from the driver, not executors. > Where in foreach is it being called? The outermost portion of foreachRDD > runs in the driver, the innermost portion runs in the executors. From the > docs: > > https://spark.apache.org/docs/latest/streaming-programming-guide.html > > dstream.foreachRDD { rdd => > val connection = createNewConnection() // executed at the driver > rdd.foreach { record => > connection.send(record) // executed at the worker > }} > > > @td I've specifically looked at kafka socket connections for the standard > 1.3 code vs my branch that has cached connections. The standard > non-caching code has very short lived connections. I've had jobs running > for a month at a time, including ones writing to mysql. Not saying it's > impossible, but I'd think we need some evidence before speculating this has > anything to do with it. > > > On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay > wrote: > >> This function is called in foreachRDD. I think it should be running in >> the executors. I add the statement.close() in the code and it is running. I >> will let you know if this fixes the issue. >> >> >> >> On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das >> wrote: >> >>> Is the function ingestToMysql running on the driver or on the executors? >>> Accordingly you can try debugging while running in a distributed manner, >>> with and without calling the function. >>> >>> If you dont get "too many open files" without calling ingestToMysql(), >>> the problem is likely to be in ingestToMysql(). >>> If you get the problem even without calling ingestToMysql(), then the >>> problem may be in Kafka. If the problem is occuring in the driver, then its >>> the DirecKafkaInputDStream code. If the problem is occurring in the >>> executor, then the problem is in KafkaRDD. >>> >>> TD >>> >>> On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu wrote: >>> >>>> Maybe add statement.close() in finally block ? >>>> >>>> Streaming / Kafka experts may have better insight. >>>> >>>> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay >>>> wrote: >>>> >>>>> Thanks for the suggestion. I ran the command and the limit is 1024. >>>>> >>>>> Based on my understanding, the connector to Kafka should not open so >>>>> many files. Do you think there is possible socket leakage? BTW, in every >>>>> batch which is 5 seconds, I output some results to mysql: >>>>> >>>>> def ingestToMysql(data: Array[String]) { >>>>> val url = >>>>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123" >>>>> var sql = "insert into loggingserver1 values " >>>>> data.foreach(line => sql += line) >>>>> sql = sql.dropRight(1) >>>>> sql += ";" >>>>> logger.info(sql) >>>>> var conn: java.sql.Connection = null >>>>> try { >>>>> conn = DriverManager.getConnection(url) >>>>> val statement = conn.createStatement() >>>>> statement.executeUpdate(sql) >>>>> } catch { >>>>> case e: Exception => logger.error(e.getMessage()) >>>>> } finally { >>>>> if (conn != null) { >>>>> conn.close >>>>> } >>>>> } >>>>> } >>>>> >>>>> I am not sure whether the leakage originates from Kafka connector or >>>>> the sql connections. >>>>> >>>>> Bill >>>>> >>>>> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu wrote: >>>>> >>>>>> Can you run the command 'ulimit -n' to see the current limit ? >>>>>> >>>>>> To configure ulimit settings on Ubuntu, edit >>>>>> */etc/security/limits.conf* >>>>>> Cheers >>>>>> >>>>>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay >>>>> > wrote: >>>
Re: Too many open files when using Spark to consume messages from Kafka
Use lsof to see what files are actually being held open. That stacktrace looks to me like it's from the driver, not executors. Where in foreach is it being called? The outermost portion of foreachRDD runs in the driver, the innermost portion runs in the executors. From the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html dstream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker }} @td I've specifically looked at kafka socket connections for the standard 1.3 code vs my branch that has cached connections. The standard non-caching code has very short lived connections. I've had jobs running for a month at a time, including ones writing to mysql. Not saying it's impossible, but I'd think we need some evidence before speculating this has anything to do with it. On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay wrote: > This function is called in foreachRDD. I think it should be running in the > executors. I add the statement.close() in the code and it is running. I > will let you know if this fixes the issue. > > > > On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das > wrote: > >> Is the function ingestToMysql running on the driver or on the executors? >> Accordingly you can try debugging while running in a distributed manner, >> with and without calling the function. >> >> If you dont get "too many open files" without calling ingestToMysql(), >> the problem is likely to be in ingestToMysql(). >> If you get the problem even without calling ingestToMysql(), then the >> problem may be in Kafka. If the problem is occuring in the driver, then its >> the DirecKafkaInputDStream code. If the problem is occurring in the >> executor, then the problem is in KafkaRDD. >> >> TD >> >> On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu wrote: >> >>> Maybe add statement.close() in finally block ? >>> >>> Streaming / Kafka experts may have better insight. >>> >>> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay >>> wrote: >>> >>>> Thanks for the suggestion. I ran the command and the limit is 1024. >>>> >>>> Based on my understanding, the connector to Kafka should not open so >>>> many files. Do you think there is possible socket leakage? BTW, in every >>>> batch which is 5 seconds, I output some results to mysql: >>>> >>>> def ingestToMysql(data: Array[String]) { >>>> val url = >>>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123" >>>> var sql = "insert into loggingserver1 values " >>>> data.foreach(line => sql += line) >>>> sql = sql.dropRight(1) >>>> sql += ";" >>>> logger.info(sql) >>>> var conn: java.sql.Connection = null >>>> try { >>>> conn = DriverManager.getConnection(url) >>>> val statement = conn.createStatement() >>>> statement.executeUpdate(sql) >>>> } catch { >>>> case e: Exception => logger.error(e.getMessage()) >>>> } finally { >>>> if (conn != null) { >>>> conn.close >>>> } >>>> } >>>> } >>>> >>>> I am not sure whether the leakage originates from Kafka connector or >>>> the sql connections. >>>> >>>> Bill >>>> >>>> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu wrote: >>>> >>>>> Can you run the command 'ulimit -n' to see the current limit ? >>>>> >>>>> To configure ulimit settings on Ubuntu, edit >>>>> */etc/security/limits.conf* >>>>> Cheers >>>>> >>>>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay >>>>> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I am using the direct approach to receive real-time data from Kafka >>>>>> in the following link: >>>>>> >>>>>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html >>>>>> >>>>>> >>>>>> My code follows the word count direct example: >>>>>> >>>>>> >>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordC
Re: Too many open files when using Spark to consume messages from Kafka
This function is called in foreachRDD. I think it should be running in the executors. I add the statement.close() in the code and it is running. I will let you know if this fixes the issue. On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das wrote: > Is the function ingestToMysql running on the driver or on the executors? > Accordingly you can try debugging while running in a distributed manner, > with and without calling the function. > > If you dont get "too many open files" without calling ingestToMysql(), the > problem is likely to be in ingestToMysql(). > If you get the problem even without calling ingestToMysql(), then the > problem may be in Kafka. If the problem is occuring in the driver, then its > the DirecKafkaInputDStream code. If the problem is occurring in the > executor, then the problem is in KafkaRDD. > > TD > > On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu wrote: > >> Maybe add statement.close() in finally block ? >> >> Streaming / Kafka experts may have better insight. >> >> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay >> wrote: >> >>> Thanks for the suggestion. I ran the command and the limit is 1024. >>> >>> Based on my understanding, the connector to Kafka should not open so >>> many files. Do you think there is possible socket leakage? BTW, in every >>> batch which is 5 seconds, I output some results to mysql: >>> >>> def ingestToMysql(data: Array[String]) { >>> val url = >>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123" >>> var sql = "insert into loggingserver1 values " >>> data.foreach(line => sql += line) >>> sql = sql.dropRight(1) >>> sql += ";" >>> logger.info(sql) >>> var conn: java.sql.Connection = null >>> try { >>> conn = DriverManager.getConnection(url) >>> val statement = conn.createStatement() >>> statement.executeUpdate(sql) >>> } catch { >>> case e: Exception => logger.error(e.getMessage()) >>> } finally { >>> if (conn != null) { >>> conn.close >>> } >>> } >>> } >>> >>> I am not sure whether the leakage originates from Kafka connector or the >>> sql connections. >>> >>> Bill >>> >>> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu wrote: >>> >>>> Can you run the command 'ulimit -n' to see the current limit ? >>>> >>>> To configure ulimit settings on Ubuntu, edit >>>> */etc/security/limits.conf* >>>> Cheers >>>> >>>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I am using the direct approach to receive real-time data from Kafka in >>>>> the following link: >>>>> >>>>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html >>>>> >>>>> >>>>> My code follows the word count direct example: >>>>> >>>>> >>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala >>>>> >>>>> >>>>> >>>>> After around 12 hours, I got the following error messages in Spark log: >>>>> >>>>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time >>>>> 143033869 ms >>>>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too >>>>> many open files, java.io.IOException: Too many open files, >>>>> java.io.IOException: Too many open files, java.io.IOException: Too many >>>>> open files, java.io.IOException: Too many open files) >>>>> at >>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >>>>> at >>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >>>>> at >>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>> at >>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>> at >>>>> scala.util.DynamicVariable.withValue(DynamicVar
Re: Too many open files when using Spark to consume messages from Kafka
Also cc;ing Cody. @Cody maybe there is a reason for doing connection pooling even if there is not performance difference. TD On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das wrote: > Is the function ingestToMysql running on the driver or on the executors? > Accordingly you can try debugging while running in a distributed manner, > with and without calling the function. > > If you dont get "too many open files" without calling ingestToMysql(), the > problem is likely to be in ingestToMysql(). > If you get the problem even without calling ingestToMysql(), then the > problem may be in Kafka. If the problem is occuring in the driver, then its > the DirecKafkaInputDStream code. If the problem is occurring in the > executor, then the problem is in KafkaRDD. > > TD > > On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu wrote: > >> Maybe add statement.close() in finally block ? >> >> Streaming / Kafka experts may have better insight. >> >> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay >> wrote: >> >>> Thanks for the suggestion. I ran the command and the limit is 1024. >>> >>> Based on my understanding, the connector to Kafka should not open so >>> many files. Do you think there is possible socket leakage? BTW, in every >>> batch which is 5 seconds, I output some results to mysql: >>> >>> def ingestToMysql(data: Array[String]) { >>> val url = >>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123" >>> var sql = "insert into loggingserver1 values " >>> data.foreach(line => sql += line) >>> sql = sql.dropRight(1) >>> sql += ";" >>> logger.info(sql) >>> var conn: java.sql.Connection = null >>> try { >>> conn = DriverManager.getConnection(url) >>> val statement = conn.createStatement() >>> statement.executeUpdate(sql) >>> } catch { >>> case e: Exception => logger.error(e.getMessage()) >>> } finally { >>> if (conn != null) { >>> conn.close >>> } >>> } >>> } >>> >>> I am not sure whether the leakage originates from Kafka connector or the >>> sql connections. >>> >>> Bill >>> >>> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu wrote: >>> >>>> Can you run the command 'ulimit -n' to see the current limit ? >>>> >>>> To configure ulimit settings on Ubuntu, edit >>>> */etc/security/limits.conf* >>>> Cheers >>>> >>>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I am using the direct approach to receive real-time data from Kafka in >>>>> the following link: >>>>> >>>>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html >>>>> >>>>> >>>>> My code follows the word count direct example: >>>>> >>>>> >>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala >>>>> >>>>> >>>>> >>>>> After around 12 hours, I got the following error messages in Spark log: >>>>> >>>>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time >>>>> 143033869 ms >>>>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too >>>>> many open files, java.io.IOException: Too many open files, >>>>> java.io.IOException: Too many open files, java.io.IOException: Too many >>>>> open files, java.io.IOException: Too many open files) >>>>> at >>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >>>>> at >>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >>>>> at >>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>> at >>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>> at >>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>>> at >>>
Re: Too many open files when using Spark to consume messages from Kafka
Is the function ingestToMysql running on the driver or on the executors? Accordingly you can try debugging while running in a distributed manner, with and without calling the function. If you dont get "too many open files" without calling ingestToMysql(), the problem is likely to be in ingestToMysql(). If you get the problem even without calling ingestToMysql(), then the problem may be in Kafka. If the problem is occuring in the driver, then its the DirecKafkaInputDStream code. If the problem is occurring in the executor, then the problem is in KafkaRDD. TD On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu wrote: > Maybe add statement.close() in finally block ? > > Streaming / Kafka experts may have better insight. > > On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay > wrote: > >> Thanks for the suggestion. I ran the command and the limit is 1024. >> >> Based on my understanding, the connector to Kafka should not open so many >> files. Do you think there is possible socket leakage? BTW, in every batch >> which is 5 seconds, I output some results to mysql: >> >> def ingestToMysql(data: Array[String]) { >> val url = >> "jdbc:mysql://localhost:3306/realtime?user=root&password=123" >> var sql = "insert into loggingserver1 values " >> data.foreach(line => sql += line) >> sql = sql.dropRight(1) >> sql += ";" >> logger.info(sql) >> var conn: java.sql.Connection = null >> try { >> conn = DriverManager.getConnection(url) >> val statement = conn.createStatement() >> statement.executeUpdate(sql) >> } catch { >> case e: Exception => logger.error(e.getMessage()) >> } finally { >> if (conn != null) { >> conn.close >> } >> } >> } >> >> I am not sure whether the leakage originates from Kafka connector or the >> sql connections. >> >> Bill >> >> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu wrote: >> >>> Can you run the command 'ulimit -n' to see the current limit ? >>> >>> To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* >>> Cheers >>> >>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay >>> wrote: >>> >>>> Hi all, >>>> >>>> I am using the direct approach to receive real-time data from Kafka in >>>> the following link: >>>> >>>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html >>>> >>>> >>>> My code follows the word count direct example: >>>> >>>> >>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala >>>> >>>> >>>> >>>> After around 12 hours, I got the following error messages in Spark log: >>>> >>>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time >>>> 143033869 ms >>>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too >>>> many open files, java.io.IOException: Too many open files, >>>> java.io.IOException: Too many open files, java.io.IOException: Too many >>>> open files, java.io.IOException: Too many open files) >>>> at >>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >>>> at >>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >>>> at >>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>> at >>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>> at >>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>> at >>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>>> at >>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >>>> at scala.Option.orElse(Option.scala:257) >>>> at >>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >>>> at >>>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >>>> at >>>> org.apache.spark.str
Re: Too many open files when using Spark to consume messages from Kafka
Maybe add statement.close() in finally block ? Streaming / Kafka experts may have better insight. On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay wrote: > Thanks for the suggestion. I ran the command and the limit is 1024. > > Based on my understanding, the connector to Kafka should not open so many > files. Do you think there is possible socket leakage? BTW, in every batch > which is 5 seconds, I output some results to mysql: > > def ingestToMysql(data: Array[String]) { > val url = "jdbc:mysql://localhost:3306/realtime?user=root&password=123" > var sql = "insert into loggingserver1 values " > data.foreach(line => sql += line) > sql = sql.dropRight(1) > sql += ";" > logger.info(sql) > var conn: java.sql.Connection = null > try { > conn = DriverManager.getConnection(url) > val statement = conn.createStatement() > statement.executeUpdate(sql) > } catch { > case e: Exception => logger.error(e.getMessage()) > } finally { > if (conn != null) { > conn.close > } > } > } > > I am not sure whether the leakage originates from Kafka connector or the > sql connections. > > Bill > > On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu wrote: > >> Can you run the command 'ulimit -n' to see the current limit ? >> >> To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* >> Cheers >> >> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay >> wrote: >> >>> Hi all, >>> >>> I am using the direct approach to receive real-time data from Kafka in >>> the following link: >>> >>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html >>> >>> >>> My code follows the word count direct example: >>> >>> >>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala >>> >>> >>> >>> After around 12 hours, I got the following error messages in Spark log: >>> >>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time >>> 143033869 ms >>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too >>> many open files, java.io.IOException: Too many open files, >>> java.io.IOException: Too many open files, java.io.IOException: Too many >>> open files, java.io.IOException: Too many open files) >>> at >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >>> at >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >>> at scala.Option.orElse(Option.scala:257) >>> at >>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >>> at >>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >>> at scala.Option.orElse(Option.scala:257) >>> at >>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >>> at >>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>> at &g
Re: Too many open files when using Spark to consume messages from Kafka
Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = "jdbc:mysql://localhost:3306/realtime?user=root&password=123" var sql = "insert into loggingserver1 values " data.foreach(line => sql += line) sql = sql.dropRight(1) sql += ";" logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception => logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu wrote: > Can you run the command 'ulimit -n' to see the current limit ? > > To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* > Cheers > > On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay > wrote: > >> Hi all, >> >> I am using the direct approach to receive real-time data from Kafka in >> the following link: >> >> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html >> >> >> My code follows the word count direct example: >> >> >> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala >> >> >> >> After around 12 hours, I got the following error messages in Spark log: >> >> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time >> 143033869 ms >> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too >> many open files, java.io.IOException: Too many open files, >> java.io.IOException: Too many open files, java.io.IOException: Too many >> open files, java.io.IOException: Too many open files) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >> at >> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >> at >> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.
Re: Too many open files when using Spark to consume messages from Kafka
Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay wrote: > Hi all, > > I am using the direct approach to receive real-time data from Kafka in the > following link: > > https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html > > > My code follows the word count direct example: > > > https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala > > > > After around 12 hours, I got the following error messages in Spark log: > > 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time > 143033869 ms > org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many > open files, java.io.IOException: Too many open files, java.io.IOException: > Too many open files, java.io.IOException: Too many open files, > java.io.IOException: Too many open files) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) > at scala.Option.orElse(Option.scala:257) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) > at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) > at scala.Option.orElse(Option.scala:257) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) > at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) > at scala.Option.orElse(Option.scala:257) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) > at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) > at scala.Option.orElse(Option.scala:257) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) > at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
Too many open files when using Spark to consume messages from Kafka
Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at
Re: Too many open files
I'm executing my application in local mode (with --master local[*]). I'm using ubuntu and I've put "session required pam_limits.so" into /etc/pam.d/common-session but it doesn't work On Mon, Mar 30, 2015 at 4:08 PM, Ted Yu wrote: > bq. In /etc/secucity/limits.conf set the next values: > > Have you done the above modification on all the machines in your Spark > cluster ? > > If you use Ubuntu, be sure that the /etc/pam.d/common-session file > contains the following line: > > session required pam_limits.so > > > On Mon, Mar 30, 2015 at 5:08 AM, Masf wrote: > >> Hi. >> >> I've done relogin, in fact, I put 'uname -n' and returns 100, but it >> crashs. >> I'm doing reduceByKey and SparkSQL mixed over 17 files (250MB-500MB/file) >> >> >> Regards. >> Miguel Angel. >> >> On Mon, Mar 30, 2015 at 1:52 PM, Akhil Das >> wrote: >> >>> Mostly, you will have to restart the machines to get the ulimit effect >>> (or relogin). What operation are you doing? Are you doing too many >>> repartitions? >>> >>> Thanks >>> Best Regards >>> >>> On Mon, Mar 30, 2015 at 4:52 PM, Masf wrote: >>> >>>> Hi >>>> >>>> I have a problem with temp data in Spark. I have fixed >>>> spark.shuffle.manager to "SORT". In /etc/secucity/limits.conf set the next >>>> values: >>>> * softnofile 100 >>>> * hardnofile 100 >>>> In spark-env.sh set ulimit -n 100 >>>> I've restarted the spark service and it continues crashing (Too many >>>> open files) >>>> >>>> How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2 >>>> >>>> java.io.FileNotFoundException: >>>> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c >>>> (Too many open files) >>>> at java.io.FileOutputStream.open(Native Method) >>>> at java.io.FileOutputStream.(FileOutputStream.java:221) >>>> at >>>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) >>>> at >>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360) >>>> at >>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355) >>>> at scala.Array$.fill(Array.scala:267) >>>> at >>>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355) >>>> at >>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) >>>> at >>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> at java.lang.Thread.run(Thread.java:745) >>>> 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID >>>> 27, localhost): java.io.FileNotFoundException: >>>> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c >>>> (Too many open files) >>>> at java.io.FileOutputStream.open(Native Method) >>>> at java.io.FileOutputStream.(FileOutputStream.java:221) >>>> at >>>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) >>>> at >>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360) >>>> at >>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355) >>>> at scala.Array$.fill(Array.scala:267) >>>>
Re: Too many open files
bq. In /etc/secucity/limits.conf set the next values: Have you done the above modification on all the machines in your Spark cluster ? If you use Ubuntu, be sure that the /etc/pam.d/common-session file contains the following line: session required pam_limits.so On Mon, Mar 30, 2015 at 5:08 AM, Masf wrote: > Hi. > > I've done relogin, in fact, I put 'uname -n' and returns 100, but it > crashs. > I'm doing reduceByKey and SparkSQL mixed over 17 files (250MB-500MB/file) > > > Regards. > Miguel Angel. > > On Mon, Mar 30, 2015 at 1:52 PM, Akhil Das > wrote: > >> Mostly, you will have to restart the machines to get the ulimit effect >> (or relogin). What operation are you doing? Are you doing too many >> repartitions? >> >> Thanks >> Best Regards >> >> On Mon, Mar 30, 2015 at 4:52 PM, Masf wrote: >> >>> Hi >>> >>> I have a problem with temp data in Spark. I have fixed >>> spark.shuffle.manager to "SORT". In /etc/secucity/limits.conf set the next >>> values: >>> * soft nofile 1000000 >>> * hardnofile 100 >>> In spark-env.sh set ulimit -n 100 >>> I've restarted the spark service and it continues crashing (Too many >>> open files) >>> >>> How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2 >>> >>> java.io.FileNotFoundException: >>> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c >>> (Too many open files) >>> at java.io.FileOutputStream.open(Native Method) >>> at java.io.FileOutputStream.(FileOutputStream.java:221) >>> at >>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) >>> at >>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360) >>> at >>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355) >>> at scala.Array$.fill(Array.scala:267) >>> at >>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355) >>> at >>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) >>> at >>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID >>> 27, localhost): java.io.FileNotFoundException: >>> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c >>> (Too many open files) >>> at java.io.FileOutputStream.open(Native Method) >>> at java.io.FileOutputStream.(FileOutputStream.java:221) >>> at >>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) >>> at >>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360) >>> at >>> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355) >>> at scala.Array$.fill(Array.scala:267) >>> at >>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355) >>> at >>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) >>> at >>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> >>> Thanks! >>> -- >>> >>> >>> Regards. >>> Miguel Ángel >>> >> >> > > > -- > > > Saludos. > Miguel Ángel >
Re: Too many open files
Hi. I've done relogin, in fact, I put 'uname -n' and returns 100, but it crashs. I'm doing reduceByKey and SparkSQL mixed over 17 files (250MB-500MB/file) Regards. Miguel Angel. On Mon, Mar 30, 2015 at 1:52 PM, Akhil Das wrote: > Mostly, you will have to restart the machines to get the ulimit effect (or > relogin). What operation are you doing? Are you doing too many > repartitions? > > Thanks > Best Regards > > On Mon, Mar 30, 2015 at 4:52 PM, Masf wrote: > >> Hi >> >> I have a problem with temp data in Spark. I have fixed >> spark.shuffle.manager to "SORT". In /etc/secucity/limits.conf set the next >> values: >> * softnofile 100 >> * hardnofile 100 >> In spark-env.sh set ulimit -n 100 >> I've restarted the spark service and it continues crashing (Too many open >> files) >> >> How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2 >> >> java.io.FileNotFoundException: >> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c >> (Too many open files) >> at java.io.FileOutputStream.open(Native Method) >> at java.io.FileOutputStream.(FileOutputStream.java:221) >> at >> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) >> at >> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360) >> at >> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355) >> at scala.Array$.fill(Array.scala:267) >> at >> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355) >> at >> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) >> at >> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:56) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID >> 27, localhost): java.io.FileNotFoundException: >> /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c >> (Too many open files) >> at java.io.FileOutputStream.open(Native Method) >> at java.io.FileOutputStream.(FileOutputStream.java:221) >> at >> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) >> at >> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360) >> at >> org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355) >> at scala.Array$.fill(Array.scala:267) >> at >> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355) >> at >> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) >> at >> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:56) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> Thanks! >> -- >> >> >> Regards. >> Miguel Ángel >> > > -- Saludos. Miguel Ángel
Re: Too many open files
Mostly, you will have to restart the machines to get the ulimit effect (or relogin). What operation are you doing? Are you doing too many repartitions? Thanks Best Regards On Mon, Mar 30, 2015 at 4:52 PM, Masf wrote: > Hi > > I have a problem with temp data in Spark. I have fixed > spark.shuffle.manager to "SORT". In /etc/secucity/limits.conf set the next > values: > * softnofile 100 > * hardnofile 100 > In spark-env.sh set ulimit -n 100 > I've restarted the spark service and it continues crashing (Too many open > files) > > How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2 > > java.io.FileNotFoundException: > /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c > (Too many open files) > at java.io.FileOutputStream.open(Native Method) > at java.io.FileOutputStream.(FileOutputStream.java:221) > at > org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355) > at scala.Array$.fill(Array.scala:267) > at > org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID > 27, localhost): java.io.FileNotFoundException: > /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c > (Too many open files) > at java.io.FileOutputStream.open(Native Method) > at java.io.FileOutputStream.(FileOutputStream.java:221) > at > org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355) > at scala.Array$.fill(Array.scala:267) > at > org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > > Thanks! > -- > > > Regards. > Miguel Ángel >
Too many open files
Hi I have a problem with temp data in Spark. I have fixed spark.shuffle.manager to "SORT". In /etc/secucity/limits.conf set the next values: * softnofile 100 * hardnofile 100 In spark-env.sh set ulimit -n 100 I've restarted the spark service and it continues crashing (Too many open files) How can I resolve? I'm executing Spark 1.2.0 in Cloudera 5.3.2 java.io.FileNotFoundException: /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c (Too many open files) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) at org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360) at org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355) at scala.Array$.fill(Array.scala:267) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/30 11:54:18 WARN TaskSetManager: Lost task 22.0 in stage 3.0 (TID 27, localhost): java.io.FileNotFoundException: /tmp/spark-local-20150330115312-37a7/2f/temp_shuffle_c4ba5bce-c516-4a2a-9e40-56121eb84a8c (Too many open files) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) at org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360) at org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355) at scala.Array$.fill(Array.scala:267) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Thanks! -- Regards. Miguel Ángel
RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Below is the output: core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 1967947 max locked memory (kbytes, -l) 64 max memory size (kbytes, -m) unlimited open files (-n) 2024 pipe size(512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 8192 cpu time (seconds, -t) unlimited max user processes (-u) 1967947 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited I have set the max open file to 2024 by ulimit -n 2024, but same issue I am not sure whether it is a reasonable setting. Actually I am doing a loop, each time try to sort only 3GB data, it runs very quick in first loop, and slow down in second loop. At each time loop I start and destroy the context (because I want to clean up the temp file create under tmp folder, which take a lot of space). Just default setting. My logic: For loop: Val sc = new sc Sql = sc.loadParquet Sortbykey Sc.stop End And I run on the EC2 c3*8xlarge, Amazon Linux AMI 2014.09.2 (HVM). From: java8964 [mailto:java8...@hotmail.com] Sent: Friday, March 20, 2015 3:54 PM To: user@spark.apache.org Subject: RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1 Do you think the ulimit for the user running Spark on your nodes? Can you run "ulimit -a" under the user who is running spark on the executor node? Does the result make sense for the data you are trying to process? Yong _ From: szheng.c...@gmail.com To: user@spark.apache.org Subject: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1 Date: Fri, 20 Mar 2015 15:28:26 -0400 Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6 b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782 6 (Too many open files) And then I switch to: conf.set("spark.shuffle.consolidateFiles", "true") .set("spark.shuffle.manager", "SORT") Then I get the error: Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly know the first issue is because Spark shuffle creates too many local temp files (and I don't know the solution, because looks like my solution also cause other issues), but I am not sure what means is the second error. Anyone knows the solution for both cases? Regards, Shuai
RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Do you think the ulimit for the user running Spark on your nodes? Can you run "ulimit -a" under the user who is running spark on the executor node? Does the result make sense for the data you are trying to process? Yong From: szheng.c...@gmail.com To: user@spark.apache.org Subject: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1 Date: Fri, 20 Mar 2015 15:28:26 -0400 Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19dd-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b53267826 (Too many open files) And then I switch to:conf.set("spark.shuffle.consolidateFiles", "true").set("spark.shuffle.manager", "SORT") Then I get the error: Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly know the first issue is because Spark shuffle creates too many local temp files (and I don’t know the solution, because looks like my solution also cause other issues), but I am not sure what means is the second error. Anyone knows the solution for both cases? Regards, Shuai
Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Assuming you are on Linux, what is your /etc/security/limits.conf set for nofile/soft (number of open file handles)? On Fri, Mar 20, 2015 at 3:29 PM Shuai Zheng wrote: > Hi All, > > > > I try to run a simple sort by on 1.2.1. And it always give me below two > errors: > > > > 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID > 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: > /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19dd-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b53267826 > (Too many open files) > > > > And then I switch to: > > conf.set("spark.shuffle.consolidateFiles", "true") > > .set("spark.shuffle.manager", "SORT") > > > > Then I get the error: > > > > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent > failure: Lost task 5.3 in stage 1.0 (TID 36, > ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException: > java.io.IOException: File too large > > at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) > > > > I roughly know the first issue is because Spark shuffle creates too many > local temp files (and I don’t know the solution, because looks like my > solution also cause other issues), but I am not sure what means is the > second error. > > > > Anyone knows the solution for both cases? > > > > Regards, > > > > Shuai >
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6 b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782 6 (Too many open files) And then I switch to: conf.set("spark.shuffle.consolidateFiles", "true") .set("spark.shuffle.manager", "SORT") Then I get the error: Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly know the first issue is because Spark shuffle creates too many local temp files (and I don't know the solution, because looks like my solution also cause other issues), but I am not sure what means is the second error. Anyone knows the solution for both cases? Regards, Shuai
Re: Too many open files
Ops,the last reply didn't go to the user list. Mail app's fault. Shuffling happens in the cluster, so you need change all the nodes in the cluster. Sent from my iPhone > On 2014年8月30日, at 3:10, Sudha Krishna wrote: > > Hi, > > Thanks for your response. Do you know if I need to change this limit on all > the cluster nodes or just the master? > Thanks > >> On Aug 29, 2014 11:43 AM, "Ye Xianjin" wrote: >> 1024 for the number of file limit is most likely too small for Linux >> Machines on production. Try to set to 65536 or unlimited if you can. The too >> many open files error occurs because there are a lot of shuffle files(if >> wrong, please correct me): >> >> Sent from my iPhone >> >> > On 2014年8月30日, at 2:06, SK wrote: >> > >> > Hi, >> > >> > I am having the same problem reported by Michael. I am trying to open 30 >> > files. ulimit -n shows the limit is 1024. So I am not sure why the program >> > is failing with "Too many open files" error. The total size of all the 30 >> > files is 230 GB. >> > I am running the job on a cluster with 10 nodes, each having 16 GB. The >> > error appears to be happening at the distinct() stage. >> > >> > Here is my program. In the following code, are all the 10 nodes trying to >> > open all of the 30 files or are the files distributed among the 30 nodes? >> > >> >val baseFile = "/mapr/mapr_dir/files_2013apr*" >> >valx = sc.textFile(baseFile)).map { line => >> >val >> > fields = line.split("\t") >> > >> > (fields(11), fields(6)) >> > >> > }.distinct().countByKey() >> >val xrdd = sc.parallelize(x.toSeq) >> >xrdd.saveAsTextFile(...) >> > >> > Instead of using the glob *, I guess I can try using a for loop to read the >> > files one by one if that helps, but not sure if there is a more efficient >> > solution. >> > >> > The following is the error transcript: >> > >> > Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent >> > failure: Exception failure in TID 902 on host 192.168.13.11: >> > java.io.FileNotFoundException: >> > /tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open >> > files) >> > java.io.FileOutputStream.open(Native Method) >> > java.io.FileOutputStream.(FileOutputStream.java:221) >> > org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116) >> > org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177) >> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) >> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158) >> > scala.collection.Iterator$class.foreach(Iterator.scala:727) >> > org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159) >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) >> > org.apache.spark.scheduler.Task.run(Task.scala:51) >> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) >> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> > java.lang.Thread.run(Thread.java:744) Driver stacktrace: >> > >> > >> > >> > >> > >> > -- >> > View this message in context: >> > http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >> > - >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> > For additional commands, e-mail: user-h...@spark.apache.org >> >
Re: Too many open files
Hi, I am having the same problem reported by Michael. I am trying to open 30 files. ulimit -n shows the limit is 1024. So I am not sure why the program is failing with "Too many open files" error. The total size of all the 30 files is 230 GB. I am running the job on a cluster with 10 nodes, each having 16 GB. The error appears to be happening at the distinct() stage. Here is my program. In the following code, are all the 10 nodes trying to open all of the 30 files or are the files distributed among the 30 nodes? val baseFile = "/mapr/mapr_dir/files_2013apr*" valx = sc.textFile(baseFile)).map { line => val fields = line.split("\t") (fields(11), fields(6)) }.distinct().countByKey() val xrdd = sc.parallelize(x.toSeq) xrdd.saveAsTextFile(...) Instead of using the glob *, I guess I can try using a for loop to read the files one by one if that helps, but not sure if there is a more efficient solution. The following is the error transcript: Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent failure: Exception failure in TID 902 on host 192.168.13.11: java.io.FileNotFoundException: /tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open files) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177) org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158) scala.collection.Iterator$class.foreach(Iterator.scala:727) org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sorting data large data- "too many open files" exception
set ulimit quite high in root mode & that should resolve it. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi <https://twitter.com/mayur_rustagi> On Mon, May 26, 2014 at 7:48 PM, Matt Kielo wrote: > Hello, > > I currently have a task always failing with > "java.io.FileNotFoundException: [...]/shuffle_0_257_2155 (Too many open > files)" when I run sorting operations such as distinct, sortByKey, or > reduceByKey on a large number of partitions. > > Im working with 365 GB of data which is being split into 5959 partitions. > The cluster Im using has over 1000GB of memory with 20GB of memory per node. > > I have tried adding .set("spark.shuffle.consolidate.files", "true") when > making my spark context but it doesnt seem to make a difference. > > Has anyone else had similar problems? > > Best regards, > > Matt > >
Sorting data large data- "too many open files" exception
Hello, I currently have a task always failing with "java.io.FileNotFoundException: [...]/shuffle_0_257_2155 (Too many open files)" when I run sorting operations such as distinct, sortByKey, or reduceByKey on a large number of partitions. Im working with 365 GB of data which is being split into 5959 partitions. The cluster Im using has over 1000GB of memory with 20GB of memory per node. I have tried adding .set("spark.shuffle.consolidate.files", "true") when making my spark context but it doesnt seem to make a difference. Has anyone else had similar problems? Best regards, Matt
Re: "Too many open files" exception on reduceByKey
Sorry, I also have some follow-up questions. "In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing." Some questions came to mind just now: 1) It would be nice to have a brief overview as to what these files are being used for? 2) Is this C*X files being opened on each machine? Also, is C the total number of cores among all machines in the cluster? Thanks, -Matt Cheah On Tue, Mar 11, 2014 at 4:35 PM, Matthew Cheah wrote: > Thanks. Just curious, is there a default number of reducers that are used? > > -Matt Cheah > > > On Mon, Mar 10, 2014 at 7:22 PM, Patrick Wendell wrote: > >> Hey Matt, >> >> The best way is definitely just to increase the ulimit if possible, >> this is sort of an assumption we make in Spark that clusters will be >> able to move it around. >> >> You might be able to hack around this by decreasing the number of >> reducers but this could have some performance implications for your >> job. >> >> In general if a node in your cluster has C assigned cores and you run >> a job with X reducers then Spark will open C*X files in parallel and >> start writing. Shuffle consolidation will help decrease the total >> number of files created but the number of file handles open at any >> time doesn't change so it won't help the ulimit problem. >> >> This means you'll have to use fewer reducers (e.g. pass reduceByKey a >> number of reducers) or use fewer cores on each machine. >> >> - Patrick >> >> On Mon, Mar 10, 2014 at 10:41 AM, Matthew Cheah >> wrote: >> > Hi everyone, >> > >> > My team (cc'ed in this e-mail) and I are running a Spark reduceByKey >> > operation on a cluster of 10 slaves where I don't have the privileges >> to set >> > "ulimit -n" to a higher number. I'm running on a cluster where "ulimit >> -n" >> > returns 1024 on each machine. >> > >> > When I attempt to run this job with the data originating from a text >> file, >> > stored in an HDFS cluster running on the same nodes as the Spark >> cluster, >> > the job crashes with the message, "Too many open files". >> > >> > My question is, why are so many files being created, and is there a way >> to >> > configure the Spark context to avoid spawning that many files? I am >> already >> > setting spark.shuffle.consolidateFiles to true. >> > >> > I want to repeat - I can't change the maximum number of open file >> > descriptors on the machines. This cluster is not owned by me and the >> system >> > administrator is responding quite slowly. >> > >> > Thanks, >> > >> > -Matt Cheah >> > >
Re: "Too many open files" exception on reduceByKey
Thanks. Just curious, is there a default number of reducers that are used? -Matt Cheah On Mon, Mar 10, 2014 at 7:22 PM, Patrick Wendell wrote: > Hey Matt, > > The best way is definitely just to increase the ulimit if possible, > this is sort of an assumption we make in Spark that clusters will be > able to move it around. > > You might be able to hack around this by decreasing the number of > reducers but this could have some performance implications for your > job. > > In general if a node in your cluster has C assigned cores and you run > a job with X reducers then Spark will open C*X files in parallel and > start writing. Shuffle consolidation will help decrease the total > number of files created but the number of file handles open at any > time doesn't change so it won't help the ulimit problem. > > This means you'll have to use fewer reducers (e.g. pass reduceByKey a > number of reducers) or use fewer cores on each machine. > > - Patrick > > On Mon, Mar 10, 2014 at 10:41 AM, Matthew Cheah > wrote: > > Hi everyone, > > > > My team (cc'ed in this e-mail) and I are running a Spark reduceByKey > > operation on a cluster of 10 slaves where I don't have the privileges to > set > > "ulimit -n" to a higher number. I'm running on a cluster where "ulimit > -n" > > returns 1024 on each machine. > > > > When I attempt to run this job with the data originating from a text > file, > > stored in an HDFS cluster running on the same nodes as the Spark cluster, > > the job crashes with the message, "Too many open files". > > > > My question is, why are so many files being created, and is there a way > to > > configure the Spark context to avoid spawning that many files? I am > already > > setting spark.shuffle.consolidateFiles to true. > > > > I want to repeat - I can't change the maximum number of open file > > descriptors on the machines. This cluster is not owned by me and the > system > > administrator is responding quite slowly. > > > > Thanks, > > > > -Matt Cheah >
Re: "Too many open files" exception on reduceByKey
Hey Matt, The best way is definitely just to increase the ulimit if possible, this is sort of an assumption we make in Spark that clusters will be able to move it around. You might be able to hack around this by decreasing the number of reducers but this could have some performance implications for your job. In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem. This means you'll have to use fewer reducers (e.g. pass reduceByKey a number of reducers) or use fewer cores on each machine. - Patrick On Mon, Mar 10, 2014 at 10:41 AM, Matthew Cheah wrote: > Hi everyone, > > My team (cc'ed in this e-mail) and I are running a Spark reduceByKey > operation on a cluster of 10 slaves where I don't have the privileges to set > "ulimit -n" to a higher number. I'm running on a cluster where "ulimit -n" > returns 1024 on each machine. > > When I attempt to run this job with the data originating from a text file, > stored in an HDFS cluster running on the same nodes as the Spark cluster, > the job crashes with the message, "Too many open files". > > My question is, why are so many files being created, and is there a way to > configure the Spark context to avoid spawning that many files? I am already > setting spark.shuffle.consolidateFiles to true. > > I want to repeat - I can't change the maximum number of open file > descriptors on the machines. This cluster is not owned by me and the system > administrator is responding quite slowly. > > Thanks, > > -Matt Cheah
"Too many open files" exception on reduceByKey
Hi everyone, My team (cc'ed in this e-mail) and I are running a Spark reduceByKey operation on a cluster of 10 slaves where I don't have the privileges to set "ulimit -n" to a higher number. I'm running on a cluster where "ulimit -n" returns 1024 on each machine. When I attempt to run this job with the data originating from a text file, stored in an HDFS cluster running on the same nodes as the Spark cluster, the job crashes with the message, "Too many open files". My question is, why are so many files being created, and is there a way to configure the Spark context to avoid spawning that many files? I am already setting spark.shuffle.consolidateFiles to true. I want to repeat - I can't change the maximum number of open file descriptors on the machines. This cluster is not owned by me and the system administrator is responding quite slowly. Thanks, -Matt Cheah
Re: [incubating-0.9.0] Too Many Open Files on Workers
Hello Andy, This is a problem we have seen in using the CQL Java driver under heavy ready loads where it is using NIO and is waiting on many pending responses which causes to many open sockets and hence too many open files. Are you by any chance using async queries? I am the maintainer of Calliope... Feel free to mail me directly on any issues/queries you have working with Calliope, will be glad to assist. Cheers, Rohit *Founder & CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Fri, Feb 21, 2014 at 3:34 PM, andy petrella wrote: > MMMmmmh good point !! > > Before answering, I tried to use callioppe but I got an issue and since > the iteration review was near I quickly switched to the datastax driver. > But I'll get to callioppe soon, with some questions maybe ;-). > > Regarding your point (very good one, I've to say), actually I'm creating a > session and a batch per partitions. > Now the shamy part... I haven't set any options for the pool :-/. Is there > some tuning clues? In my case the C* is local (docker image) so maybe > should i do > builder.poolingOptions().setMaxConnectionsPerHost(LOCAL, BIGNUMBER)? > > The point is, what about this BIGNUMBER... can it be really big? (Sounds > weird to me, but I don't want to pre-filter options based on feelings). > > Thanks for your response > > andy > > On Fri, Feb 21, 2014 at 10:36 AM, Sourav Chandra < > sourav.chan...@livestream.com> wrote: > >> From stacktrace it looks like you are using datstax cassandra driver and >> it tried to create cluster. >> >> How many connections you are creating in poolingOptions() i.e. builder. >> poolingOptions().setMaxConnectionsPerHost(...) >> >> Are you creating this per rdd? Might be there are lots of connections >> created and at last it failed to create any more. >> >> Thanks, >> Sourav >> >> >> On Fri, Feb 21, 2014 at 3:02 PM, andy petrella >> wrote: >> >>> Hey guyz, >>> >>> I've got this issue (see bottom) with Spark, deployed in Standalone mode >>> on a local docker environment. >>> I know that I need to raise the ulimit (only 1024 now) but in the >>> meantime I was just wondering how this could happen. >>> My gut feeling is because I'm mounting a lot in memory and Spark tries >>> to dump some RDDs on the FS, and then boom. >>> >>> Also, I was wondering if it cannot be a clue that my job is maybe to >>> eager in memory? How is it something quite normal which such a low ulimit >>> on workers? >>> >>> Thanks a lot (in advance ^^) >>> >>> Cheers, >>> andy >>> >>> >>> >>> 14/02/21 08:32:15 ERROR Executor: Exception in task ID 472 >>> org.jboss.netty.channel.ChannelException: Failed to create a selector. >>> at >>> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:337) >>> at >>> org.jboss.netty.channel.socket.nio.AbstractNioSelector.(AbstractNioSelector.java:95) >>> at >>> org.jboss.netty.channel.socket.nio.AbstractNioWorker.(AbstractNioWorker.java:53) >>> at org.jboss.netty.channel.socket.nio.NioWorker.(NioWorker.java:45) >>> at >>> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45) >>> at >>> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28) >>> at >>> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:99) >>> at >>> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:69) >>> at >>> org.jboss.netty.channel.socket.nio.NioWorkerPool.(NioWorkerPool.java:39) >>> at >>> org.jboss.netty.channel.socket.nio.NioWorkerPool.(NioWorkerPool.java:33) >>> at >>> org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.(NioClientSocketChannelFactory.java:151) >>> at >>> org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.(NioClientSocketChannelFactory.java:116) >>> at >>> com.datastax.driver.core.Connection$Factory.(Connection.java:349) >>> at >>> com.datastax.driver.core.Connection$Factory.(Connection.java:360) >>> at com.datastax.driver.core.Cluster$Manager.(Cluster.java:857) >>> at com.datastax.driver.core.Cluster$Manager.(Cluster.java:806) >>> at com.datastax.driver.core.Cluster.(Cluster.java:76) >>> at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:132