Yes On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande <deshpandesh...@gmail.com> wrote: > Thanks Cody again. > > No. I am doing mapping of the Kafka ConsumerRecord to be able to save it in > the Cassandra table and saveToCassandra is an action and my data do get > saved into Cassandra. It is working as expected 99% of the time except that > when there is an exception, I did not want the offsets to be committed. > > By Filtering for unsuccessful attempts, do you mean filtering the bad > records... > > > > > > > On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger <c...@koeninger.org> wrote: >> >> If literally all you are doing is rdd.map I wouldn't expect >> saveToCassandra to happen at all, since map is not an action. >> >> Filtering for unsuccessful attempts and collecting those back to the >> driver would be one way for the driver to know whether it was safe to >> commit. >> >> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande >> <deshpandesh...@gmail.com> wrote: >> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1") --> is >> > running on executor >> > >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> is >> > running on driver. >> > >> > Is this the reason why kafka offsets are committed even when an >> > exception is >> > raised? If so is there a way to commit the offsets only when there are >> > no >> > exceptions? >> > >> > >> > >> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande >> > <deshpandesh...@gmail.com> >> > wrote: >> >> >> >> Thanks again Cody, >> >> >> >> My understanding is all the code inside foreachRDD is running on the >> >> driver except for >> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1"). >> >> >> >> When the exception is raised, I was thinking I won't be committing the >> >> offsets, but the offsets are committed all the time independent of >> >> whether >> >> an exception was raised or not. >> >> >> >> It will be helpful if you can explain this behavior. >> >> >> >> >> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger <c...@koeninger.org> >> >> wrote: >> >>> >> >>> I mean that the kafka consumers running on the executors should not be >> >>> automatically committing, because the fact that a message was read by >> >>> the consumer has no bearing on whether it was actually successfully >> >>> processed after reading. >> >>> >> >>> It sounds to me like you're confused about where code is running. >> >>> foreachRDD runs on the driver, not the executor. >> >>> >> >>> >> >>> >> >>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd >> >>> >> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande >> >>> <deshpandesh...@gmail.com> wrote: >> >>> > Thanks Cody for your response. >> >>> > >> >>> > All I want to do is, commit the offsets only if I am successfully >> >>> > able >> >>> > to >> >>> > write to cassandra database. >> >>> > >> >>> > The line //save the rdd to Cassandra database is >> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1") >> >>> > >> >>> > What do you mean by Executors shouldn't be auto-committing, that's >> >>> > why >> >>> > it's >> >>> > being overridden. It is the executors that do the mapping and saving >> >>> > to >> >>> > cassandra. The status of success or failure of this operation is >> >>> > known >> >>> > only >> >>> > on the executor and thats where I want to commit the kafka offsets. >> >>> > If >> >>> > this >> >>> > is not what I sould be doing, then what is the right way? >> >>> > >> >>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger <c...@koeninger.org> >> >>> > wrote: >> >>> >> >> >>> >> If your complaint is about offsets being committed that you didn't >> >>> >> expect... auto commit being false on executors shouldn't have >> >>> >> anything >> >>> >> to do with that. Executors shouldn't be auto-committing, that's >> >>> >> why >> >>> >> it's being overridden. >> >>> >> >> >>> >> What you've said and the code you posted isn't really enough to >> >>> >> explain what your issue is, e.g. >> >>> >> >> >>> >> is this line >> >>> >> // save the rdd to Cassandra database >> >>> >> a blocking call >> >>> >> >> >>> >> are you sure that the rdd foreach isn't being retried and >> >>> >> succeeding >> >>> >> the second time around, etc >> >>> >> >> >>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande >> >>> >> <deshpandesh...@gmail.com> wrote: >> >>> >> > Hello All, >> >>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 . >> >>> >> > >> >>> >> > I am setting enable.auto.commit to false, and manually want to >> >>> >> > commit >> >>> >> > the >> >>> >> > offsets after my output operation is successful. So when a >> >>> >> > exception >> >>> >> > is >> >>> >> > raised during during the processing I do not want the offsets to >> >>> >> > be >> >>> >> > committed. But looks like the offsets are automatically committed >> >>> >> > even >> >>> >> > when >> >>> >> > the exception is raised and thereby I am losing data. >> >>> >> > In my logs I see, WARN overriding enable.auto.commit to false >> >>> >> > for >> >>> >> > executor. But I don't want it to override. Please help. >> >>> >> > >> >>> >> > My code looks like.. >> >>> >> > >> >>> >> > val kafkaParams = Map[String, Object]( >> >>> >> > "bootstrap.servers" -> brokers, >> >>> >> > "key.deserializer" -> classOf[StringDeserializer], >> >>> >> > "value.deserializer" -> classOf[StringDeserializer], >> >>> >> > "group.id" -> "Group1", >> >>> >> > "auto.offset.reset" -> offsetresetparameter, >> >>> >> > "enable.auto.commit" -> (false: java.lang.Boolean) >> >>> >> > ) >> >>> >> > >> >>> >> > val myTopics = Array("topic1") >> >>> >> > val stream1 = KafkaUtils.createDirectStream[String, String]( >> >>> >> > ssc, >> >>> >> > PreferConsistent, >> >>> >> > Subscribe[String, String](myTopics, kafkaParams) >> >>> >> > ) >> >>> >> > >> >>> >> > stream1.foreachRDD { (rdd, time) => >> >>> >> > val offsetRanges = >> >>> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges >> >>> >> > try { >> >>> >> > //save the rdd to Cassandra database >> >>> >> > >> >>> >> > >> >>> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) >> >>> >> > } catch { >> >>> >> > case ex: Exception => { >> >>> >> > println(ex.toString + "!!!!!! Bad Data, Unable to >> >>> >> > persist >> >>> >> > into >> >>> >> > table !!!!!" + errorOffsetRangesToString(offsetRanges)) >> >>> >> > } >> >>> >> > } >> >>> >> > } >> >>> >> > >> >>> >> > ssc.start() >> >>> >> > ssc.awaitTermination() >> >>> > >> >>> > >> >> >> >> >> > > >
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org