Hello All, I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 . I am setting enable.auto.commit to false, and manually want to commit the offsets after my output operation is successful. So when a exception is raised during during the processing I do not want the offsets to be committed. But looks like the offsets are automatically committed even when the exception is raised and thereby I am losing data. In my logs I see, WARN overriding enable.auto.commit to false for executor. But I don't want it to override. Please help.
My code looks like.. val kafkaParams = Map[String, Object]( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "Group1", "auto.offset.reset" -> offsetresetparameter, "enable.auto.commit" -> (false: java.lang.Boolean) ) val myTopics = Array("topic1") val stream1 = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](myTopics, kafkaParams) ) stream1.foreachRDD { (rdd, time) => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges try { //save the rdd to Cassandra database stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } catch { case ex: Exception => { println(ex.toString + "!!!!!! Bad Data, Unable to persist into table !!!!!" + errorOffsetRangesToString(offsetRanges)) } } } ssc.start() ssc.awaitTermination()