I have a spark streaming job that read tweets stream from gnip and write it
to Kafak.

Spark and kafka are running on the same cluster. 

My cluster consists of 5 nodes. Kafka-b01 ... Kafka-b05

Spark master is running on Kafak-b05. 

Here is how we submit the spark job

*nohup sh $SPZRK_HOME/bin/spark-submit --total-executor-cores 5 --class
org.css.java.gnipStreaming.GnipSparkStreamer --master spark://kafka-b05:7077
GnipStreamContainer.jar powertrack
kafka-b01.css.org,kafka-b02.css.org,kafka-b03.css.org,kafka-b04.css.org,kafka-b05.css.org
gnip_live_stream 2 &*

After about 1 hour the spark job get killed

The logs in the nohub file shows the following exception

/org.apache.spark.storage.BlockFetchException: Failed to fetch block from 2
locations. Most recent failure cause:
        at
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
        at
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
        at 
org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:570)
        at org.apache.spark.storage.BlockManager.get(BlockManager.scala:630)
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:48)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.channel.ChannelException: Unable to create Channel from
class class io.netty.channel.socket.nio.NioSocketChannel
        at
io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:455)
        at
io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306)
        at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134)
        at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116)
        at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:211)
        at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
        at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
        at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
        at
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:99)
        at
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
        at
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
        ... 15 more
Caused by: io.netty.channel.ChannelException: Failed to open a socket.
        at
io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:62)
        at
io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:72)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at
io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:453)
        ... 26 more
Caused by: java.net.SocketException: Too many open files
        at sun.nio.ch.Net.socket0(Native Method)
        at sun.nio.ch.Net.socket(Net.java:411)
        at sun.nio.ch.Net.socket(Net.java:404)
        at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:105)
        at
sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
        at
io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:60)
        ... 33 more/


I have increased the maximum number of open files but I am still facing the
same issue. 

When I checked the stderr logs of the workers from spark web interface I
found another exception. 

/java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
        at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188)
        at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152)
        at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151)
        at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
        at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
        at kafka.producer.Producer.send(Producer.scala:77)
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
        at
org.css.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:59)
        at
org.css.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:51)
        at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
        at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)/

The second exception (as it seems) is related to Kafka not spark. 

What do you think the problem is?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Job-get-killed-after-running-for-about-1-hour-tp26823.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to