We have similar jobs consuming from Kafka and writing to elastic search and the 
culprit is usually not enough memory for the executor or driver or not enough 
executors in general to process the job try using dynamic allocation if you're 
not too sure about how many cores/executors you actually need to process this 
job. 

Sent from Outlook for iPhone




On Sun, Apr 24, 2016 at 2:34 AM -0700, "fanooos" <dev.fano...@gmail.com> wrote:










I have a spark streaming job that read tweets stream from gnip and write it
to Kafak.

Spark and kafka are running on the same cluster. 

My cluster consists of 5 nodes. Kafka-b01 ... Kafka-b05

Spark master is running on Kafak-b05. 

Here is how we submit the spark job

*nohup sh $SPZRK_HOME/bin/spark-submit --total-executor-cores 5 --class
org.css.java.gnipStreaming.GnipSparkStreamer --master spark://kafka-b05:7077
GnipStreamContainer.jar powertrack
kafka-b01.css.org,kafka-b02.css.org,kafka-b03.css.org,kafka-b04.css.org,kafka-b05.css.org
gnip_live_stream 2 &*

After about 1 hour the spark job get killed

The logs in the nohub file shows the following exception

/org.apache.spark.storage.BlockFetchException: Failed to fetch block from 2
locations. Most recent failure cause:
        at
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
        at
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
        at 
org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:570)
        at org.apache.spark.storage.BlockManager.get(BlockManager.scala:630)
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:48)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.channel.ChannelException: Unable to create Channel from
class class io.netty.channel.socket.nio.NioSocketChannel
        at
io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:455)
        at
io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306)
        at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134)
        at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116)
        at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:211)
        at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
        at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
        at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
        at
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:99)
        at
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
        at
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
        ... 15 more
Caused by: io.netty.channel.ChannelException: Failed to open a socket.
        at
io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:62)
        at
io.netty.channel.socket.nio.NioSocketChannel.(NioSocketChannel.java:72)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at
io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:453)
        ... 26 more
Caused by: java.net.SocketException: Too many open files
        at sun.nio.ch.Net.socket0(Native Method)
        at sun.nio.ch.Net.socket(Net.java:411)
        at sun.nio.ch.Net.socket(Net.java:404)
        at sun.nio.ch.SocketChannelImpl.(SocketChannelImpl.java:105)
        at
sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
        at
io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:60)
        ... 33 more/


I have increased the maximum number of open files but I am still facing the
same issue. 

When I checked the stderr logs of the workers from spark web interface I
found another exception. 

/java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
        at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188)
        at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152)
        at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151)
        at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
        at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
        at kafka.producer.Producer.send(Producer.scala:77)
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
        at
org.css.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:59)
        at
org.css.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:51)
        at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
        at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)/

The second exception (as it seems) is related to Kafka not spark. 

What do you think the problem is?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Job-get-killed-after-running-for-about-1-hour-tp26823.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org







-- 
*NOTICE TO RECIPIENTS*: This communication is confidential and intended for 
the use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an 
offer to sell or a solicitation of an indication of interest to purchase 
any loan, security or any other financial product or instrument, nor is it 
an offer to sell or a solicitation of an indication of interest to purchase 
any products or services to any persons who are prohibited from receiving 
such information under applicable law. The contents of this communication 
may not be accurate or complete and are subject to change without notice. 
As such, Orchard App, Inc. (including its subsidiaries and affiliates, 
"Orchard") makes no representation regarding the accuracy or completeness 
of the information contained herein. The intended recipient is advised to 
consult its own professional advisors, including those specializing in 
legal, tax and accounting matters. Orchard does not provide legal, tax or 
accounting advice.

Reply via email to