I am using the latest Spark version 1.6 I have increased the maximum number of open files using this command *sysctl -w fs.file-max=3275782*
Also I increased the limit for the user who run the spark job by updating the /etc/security/limits.conf file. Soft limit is 1024 and Hard limit is 65536. The operating system is Red Hat Enterprise Linux Server release 6.6 (Santiago) @Rodrick : I will try to increase the assigned memory and see Best regards On 24 April 2016 at 16:42, Ted Yu <yuzhih...@gmail.com> wrote: > Which version of Spark are you using ? > > How did you increase the open file limit ? > > Which operating system do you use ? > > Please see Example 6. ulimit Settings on Ubuntu under: > http://hbase.apache.org/book.html#basic.prerequisites > > On Sun, Apr 24, 2016 at 2:34 AM, fanooos <dev.fano...@gmail.com> wrote: > >> I have a spark streaming job that read tweets stream from gnip and write >> it >> to Kafak. >> >> Spark and kafka are running on the same cluster. >> >> My cluster consists of 5 nodes. Kafka-b01 ... Kafka-b05 >> >> Spark master is running on Kafak-b05. >> >> Here is how we submit the spark job >> >> *nohup sh $SPZRK_HOME/bin/spark-submit --total-executor-cores 5 --class >> org.css.java.gnipStreaming.GnipSparkStreamer --master >> spark://kafka-b05:7077 >> GnipStreamContainer.jar powertrack >> kafka-b01.css.org,kafka-b02.css.org,kafka-b03.css.org,kafka-b04.css.org, >> kafka-b05.css.org >> gnip_live_stream 2 &* >> >> After about 1 hour the spark job get killed >> >> The logs in the nohub file shows the following exception >> >> /org.apache.spark.storage.BlockFetchException: Failed to fetch block from >> 2 >> locations. Most recent failure cause: >> at >> >> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) >> at >> >> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585) >> at >> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585) >> at >> org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:570) >> at >> org.apache.spark.storage.BlockManager.get(BlockManager.scala:630) >> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:48) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: io.netty.channel.ChannelException: Unable to create Channel >> from >> class class io.netty.channel.socket.nio.NioSocketChannel >> at >> >> io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:455) >> at >> >> io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306) >> at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134) >> at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) >> at >> >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:211) >> at >> >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) >> at >> >> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90) >> at >> >> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) >> at >> >> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) >> at >> >> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:99) >> at >> >> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) >> at >> >> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588) >> ... 15 more >> Caused by: io.netty.channel.ChannelException: Failed to open a socket. >> at >> >> io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:62) >> at >> >> io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:72) >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at java.lang.Class.newInstance(Class.java:442) >> at >> >> io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:453) >> ... 26 more >> Caused by: java.net.SocketException: Too many open files >> at sun.nio.ch.Net.socket0(Native Method) >> at sun.nio.ch.Net.socket(Net.java:411) >> at sun.nio.ch.Net.socket(Net.java:404) >> at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:105) >> at >> >> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60) >> at >> >> io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:60) >> ... 33 more/ >> >> >> I have increased the maximum number of open files but I am still facing >> the >> same issue. >> >> When I checked the stderr logs of the workers from spark web interface I >> found another exception. >> >> /java.nio.channels.ClosedChannelException >> at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) >> at >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) >> at >> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74) >> at kafka.producer.SyncProducer.send(SyncProducer.scala:119) >> at >> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) >> at >> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >> at >> >> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) >> at >> >> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188) >> at >> >> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152) >> at >> >> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) >> at >> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> >> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151) >> at >> >> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) >> at >> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) >> at kafka.producer.Producer.send(Producer.scala:77) >> at kafka.javaapi.producer.Producer.send(Producer.scala:33) >> at >> >> org.css.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:59) >> at >> >> org.css.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:51) >> at >> >> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) >> at >> >> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) >> at >> >> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) >> at >> >> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) >> at >> >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >> at >> >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >> at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745)/ >> >> The second exception (as it seems) is related to Kafka not spark. >> >> What do you think the problem is? >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Job-get-killed-after-running-for-about-1-hour-tp26823.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > -- Anas Rabei Senior Software Developer Mubasher.info anas.ra...@mubasher.info