Hello All,
I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .

I am setting enable.auto.commit to false, and manually want to commit
the offsets after my output operation is successful. So when a
exception is raised during during the processing I do not want the
offsets to be committed. But looks like the offsets are automatically
committed even when the exception is raised and thereby I am losing
data.
In my logs I see,  WARN  overriding enable.auto.commit to false for
executor.  But I don't want it to override. Please help.

My code looks like..

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "Group1",
      "auto.offset.reset" -> offsetresetparameter,
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val myTopics = Array("topic1")
    val stream1 = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](myTopics, kafkaParams)
    )

    stream1.foreachRDD { (rdd, time) =>
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        try {
            //save the rdd to Cassandra database

          stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        } catch {
          case ex: Exception => {
            println(ex.toString + "!!!!!! Bad Data, Unable to persist
into table !!!!!" + errorOffsetRangesToString(offsetRanges))
          }
        }
    }

    ssc.start()
    ssc.awaitTermination()

Reply via email to