If your complaint is about offsets being committed that you didn't expect... auto commit being false on executors shouldn't have anything to do with that. Executors shouldn't be auto-committing, that's why it's being overridden.
What you've said and the code you posted isn't really enough to explain what your issue is, e.g. is this line // save the rdd to Cassandra database a blocking call are you sure that the rdd foreach isn't being retried and succeeding the second time around, etc On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande <deshpandesh...@gmail.com> wrote: > Hello All, > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 . > > I am setting enable.auto.commit to false, and manually want to commit the > offsets after my output operation is successful. So when a exception is > raised during during the processing I do not want the offsets to be > committed. But looks like the offsets are automatically committed even when > the exception is raised and thereby I am losing data. > In my logs I see, WARN overriding enable.auto.commit to false for > executor. But I don't want it to override. Please help. > > My code looks like.. > > val kafkaParams = Map[String, Object]( > "bootstrap.servers" -> brokers, > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[StringDeserializer], > "group.id" -> "Group1", > "auto.offset.reset" -> offsetresetparameter, > "enable.auto.commit" -> (false: java.lang.Boolean) > ) > > val myTopics = Array("topic1") > val stream1 = KafkaUtils.createDirectStream[String, String]( > ssc, > PreferConsistent, > Subscribe[String, String](myTopics, kafkaParams) > ) > > stream1.foreachRDD { (rdd, time) => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > try { > //save the rdd to Cassandra database > > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } catch { > case ex: Exception => { > println(ex.toString + "!!!!!! Bad Data, Unable to persist into > table !!!!!" + errorOffsetRangesToString(offsetRanges)) > } > } > } > > ssc.start() > ssc.awaitTermination() --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org