Also, if you actually want to use kafka, you're much better off with a
replication factor greater than 1, so you get leader re-election.

On Fri, Nov 20, 2015 at 9:20 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Spark specific questions are better directed to the Spark user list.
>
> Spark will retry failed tasks automatically up to a configurable number of
> times.  The direct stream will retry failures on the driver up to a
> configurable number of times.
>
> See
>
> http://spark.apache.org/docs/latest/configuration.html
>
> The properties you're looking for are
>
> spark.task.maxFailures
> spark.streaming.kafka.maxRetries
>
> respectively
>
> On Fri, Nov 20, 2015 at 7:12 AM, Charan Ganga Phani Adabala <
> char...@eiqnetworks.com> wrote:
>
>> Hi All,
>>
>> We are using the Apache Spark 1.5.1 and kafka_2.10-0.8.2.1 and Kafka
>> DirectStream API to fetch data from Kafka using Spark.
>>
>> Kafka topic properties: Replication Factor :1 and Partitions : 1
>> Kafka cluster size: 3 Nodes
>>
>>
>> When all Kafka nodes are up & running, I could successfully get the data
>> for all the topics.
>>
>>
>>
>> But when one of the Kafka node is down , we are getting below exceptions
>> and though the Node is up after some time, still we are not succeeded, and
>> the spark job is terminated. and unable fetch the data from remaining
>> topics in Kafka.
>>
>> ERROR DirectKafkaInputDStream:125 -
>> ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for
>> Set([normalized-tenant4,0]))
>> ERROR JobScheduler:96 - Error generating jobs for time 1447929990000 ms
>> org.apache.spark.SparkException:
>> ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for
>> Set([normalized-tenant4,0]))
>>         at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>>         at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>         at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>>         at scala.Option.orElse(Option.scala:257)
>>         at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>>         at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>         at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>         at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>         at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>         at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>         at
>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>         at
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
>>         at scala.util.Try$.apply(Try.scala:161)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
>>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>>
>> Thanks in advance please help how to resolve the issue.
>>
>>
>>
>>
>>
>>
>>
>> Thanks & Regards,
>>
>> *Ganga Phani Charan Adabala | Software Engineer*
>>
>> o:  +91-40-23116680 | c:  +91-9491418099
>>
>> EiQ Networks, Inc. <http://www.eiqnetworks.com/>
>>
>>
>>
>>
>>
>> [image: cid:image001.png@01D11C9D.AF5CC1F0] <http://www.eiqnetworks.com/>
>>
>> *"This email is intended only for the use of the individual or entity
>> named above and may contain information that is confidential and
>> privileged. If you are not the intended recipient, you are hereby notified
>> that any dissemination, distribution or copying of the email is strictly
>> prohibited. If you have received this email in error, please destroy
>> the original message."*
>>
>
>

Reply via email to