Which version of Spark are you using ? How did you increase the open file limit ?
Which operating system do you use ? Please see Example 6. ulimit Settings on Ubuntu under: http://hbase.apache.org/book.html#basic.prerequisites On Sun, Apr 24, 2016 at 2:34 AM, fanooos <dev.fano...@gmail.com> wrote: > I have a spark streaming job that read tweets stream from gnip and write it > to Kafak. > > Spark and kafka are running on the same cluster. > > My cluster consists of 5 nodes. Kafka-b01 ... Kafka-b05 > > Spark master is running on Kafak-b05. > > Here is how we submit the spark job > > *nohup sh $SPZRK_HOME/bin/spark-submit --total-executor-cores 5 --class > org.css.java.gnipStreaming.GnipSparkStreamer --master > spark://kafka-b05:7077 > GnipStreamContainer.jar powertrack > kafka-b01.css.org,kafka-b02.css.org,kafka-b03.css.org,kafka-b04.css.org, > kafka-b05.css.org > gnip_live_stream 2 &* > > After about 1 hour the spark job get killed > > The logs in the nohub file shows the following exception > > /org.apache.spark.storage.BlockFetchException: Failed to fetch block from 2 > locations. Most recent failure cause: > at > > org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) > at > > org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585) > at > org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:570) > at > org.apache.spark.storage.BlockManager.get(BlockManager.scala:630) > at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:48) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: io.netty.channel.ChannelException: Unable to create Channel from > class class io.netty.channel.socket.nio.NioSocketChannel > at > > io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:455) > at > > io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306) > at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134) > at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) > at > > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:211) > at > > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) > at > > org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90) > at > > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > at > > org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) > at > > org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:99) > at > > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) > at > > org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588) > ... 15 more > Caused by: io.netty.channel.ChannelException: Failed to open a socket. > at > > io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:62) > at > > io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:72) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at java.lang.Class.newInstance(Class.java:442) > at > > io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:453) > ... 26 more > Caused by: java.net.SocketException: Too many open files > at sun.nio.ch.Net.socket0(Native Method) > at sun.nio.ch.Net.socket(Net.java:411) > at sun.nio.ch.Net.socket(Net.java:404) > at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:105) > at > > sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60) > at > > io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:60) > ... 33 more/ > > > I have increased the maximum number of open files but I am still facing the > same issue. > > When I checked the stderr logs of the workers from spark web interface I > found another exception. > > /java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > at > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74) > at kafka.producer.SyncProducer.send(SyncProducer.scala:119) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188) > at > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152) > at > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151) > at > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > at > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > at kafka.producer.Producer.send(Producer.scala:77) > at kafka.javaapi.producer.Producer.send(Producer.scala:33) > at > > org.css.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:59) > at > > org.css.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:51) > at > > org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) > at > > org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) > at > > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) > at > > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) > at > > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > at > > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745)/ > > The second exception (as it seems) is related to Kafka not spark. > > What do you think the problem is? > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Job-get-killed-after-running-for-about-1-hour-tp26823.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >