We have similar jobs consuming from Kafka and writing to elastic search and the culprit is usually not enough memory for the executor or driver or not enough executors in general to process the job try using dynamic allocation if you're not too sure about how many cores/executors you actually need to process this job.
Sent from Outlook for iPhone On Sun, Apr 24, 2016 at 2:34 AM -0700, "fanooos" <dev.fano...@gmail.com> wrote: I have a spark streaming job that read tweets stream from gnip and write it to Kafak. Spark and kafka are running on the same cluster. My cluster consists of 5 nodes. Kafka-b01 ... Kafka-b05 Spark master is running on Kafak-b05. Here is how we submit the spark job *nohup sh $SPZRK_HOME/bin/spark-submit --total-executor-cores 5 --class org.css.java.gnipStreaming.GnipSparkStreamer --master spark://kafka-b05:7077 GnipStreamContainer.jar powertrack kafka-b01.css.org,kafka-b02.css.org,kafka-b03.css.org,kafka-b04.css.org,kafka-b05.css.org gnip_live_stream 2 &* After about 1 hour the spark job get killed The logs in the nohub file shows the following exception /org.apache.spark.storage.BlockFetchException: Failed to fetch block from 2 locations. Most recent failure cause: at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585) at org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:570) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:630) at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:48) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: io.netty.channel.ChannelException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:455) at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306) at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134) at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:211) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:99) at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588) ... 15 more Caused by: io.netty.channel.ChannelException: Failed to open a socket. at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:62) at io.netty.channel.socket.nio.NioSocketChannel.(NioSocketChannel.java:72) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:453) ... 26 more Caused by: java.net.SocketException: Too many open files at sun.nio.ch.Net.socket0(Native Method) at sun.nio.ch.Net.socket(Net.java:411) at sun.nio.ch.Net.socket(Net.java:404) at sun.nio.ch.SocketChannelImpl.(SocketChannelImpl.java:105) at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60) at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:60) ... 33 more/ I have increased the maximum number of open files but I am still facing the same issue. When I checked the stderr logs of the workers from spark web interface I found another exception. /java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:119) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) at kafka.producer.Producer.send(Producer.scala:77) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at org.css.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:59) at org.css.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:51) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)/ The second exception (as it seems) is related to Kafka not spark. What do you think the problem is? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Job-get-killed-after-running-for-about-1-hour-tp26823.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *NOTICE TO RECIPIENTS*: This communication is confidential and intended for the use of the addressee only. If you are not an intended recipient of this communication, please delete it immediately and notify the sender by return email. Unauthorized reading, dissemination, distribution or copying of this communication is prohibited. This communication does not constitute an offer to sell or a solicitation of an indication of interest to purchase any loan, security or any other financial product or instrument, nor is it an offer to sell or a solicitation of an indication of interest to purchase any products or services to any persons who are prohibited from receiving such information under applicable law. The contents of this communication may not be accurate or complete and are subject to change without notice. As such, Orchard App, Inc. (including its subsidiaries and affiliates, "Orchard") makes no representation regarding the accuracy or completeness of the information contained herein. The intended recipient is advised to consult its own professional advisors, including those specializing in legal, tax and accounting matters. Orchard does not provide legal, tax or accounting advice.