Also, if you actually want to use kafka, you're much better off with a replication factor greater than 1, so you get leader re-election.
On Fri, Nov 20, 2015 at 9:20 AM, Cody Koeninger <c...@koeninger.org> wrote: > Spark specific questions are better directed to the Spark user list. > > Spark will retry failed tasks automatically up to a configurable number of > times. The direct stream will retry failures on the driver up to a > configurable number of times. > > See > > http://spark.apache.org/docs/latest/configuration.html > > The properties you're looking for are > > spark.task.maxFailures > spark.streaming.kafka.maxRetries > > respectively > > On Fri, Nov 20, 2015 at 7:12 AM, Charan Ganga Phani Adabala < > char...@eiqnetworks.com> wrote: > >> Hi All, >> >> We are using the Apache Spark 1.5.1 and kafka_2.10-0.8.2.1 and Kafka >> DirectStream API to fetch data from Kafka using Spark. >> >> Kafka topic properties: Replication Factor :1 and Partitions : 1 >> Kafka cluster size: 3 Nodes >> >> >> When all Kafka nodes are up & running, I could successfully get the data >> for all the topics. >> >> >> >> But when one of the Kafka node is down , we are getting below exceptions >> and though the Node is up after some time, still we are not succeeded, and >> the spark job is terminated. and unable fetch the data from remaining >> topics in Kafka. >> >> ERROR DirectKafkaInputDStream:125 - >> ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for >> Set([normalized-tenant4,0])) >> ERROR JobScheduler:96 - Error generating jobs for time 1447929990000 ms >> org.apache.spark.SparkException: >> ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for >> Set([normalized-tenant4,0])) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) >> at >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) >> at >> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) >> at >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >> at >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) >> at >> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) >> at >> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) >> at scala.util.Try$.apply(Try.scala:161) >> at >> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) >> at org.apache.spark.streaming.scheduler.JobGenerator.org >> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> >> >> Thanks in advance please help how to resolve the issue. >> >> >> >> >> >> >> >> Thanks & Regards, >> >> *Ganga Phani Charan Adabala | Software Engineer* >> >> o: +91-40-23116680 | c: +91-9491418099 >> >> EiQ Networks, Inc. <http://www.eiqnetworks.com/> >> >> >> >> >> >> [image: cid:image001.png@01D11C9D.AF5CC1F0] <http://www.eiqnetworks.com/> >> >> *"This email is intended only for the use of the individual or entity >> named above and may contain information that is confidential and >> privileged. If you are not the intended recipient, you are hereby notified >> that any dissemination, distribution or copying of the email is strictly >> prohibited. If you have received this email in error, please destroy >> the original message."* >> > >