I am doing that already for all known messy data. Thanks Cody for all your time and input
On Mon, Aug 7, 2017 at 11:58 AM, Cody Koeninger <c...@koeninger.org> wrote: > Yes > > On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande > <deshpandesh...@gmail.com> wrote: > > Thanks Cody again. > > > > No. I am doing mapping of the Kafka ConsumerRecord to be able to save it > in > > the Cassandra table and saveToCassandra is an action and my data do get > > saved into Cassandra. It is working as expected 99% of the time except > that > > when there is an exception, I did not want the offsets to be committed. > > > > By Filtering for unsuccessful attempts, do you mean filtering the bad > > records... > > > > > > > > > > > > > > On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> If literally all you are doing is rdd.map I wouldn't expect > >> saveToCassandra to happen at all, since map is not an action. > >> > >> Filtering for unsuccessful attempts and collecting those back to the > >> driver would be one way for the driver to know whether it was safe to > >> commit. > >> > >> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande > >> <deshpandesh...@gmail.com> wrote: > >> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1") --> > is > >> > running on executor > >> > > >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> > is > >> > running on driver. > >> > > >> > Is this the reason why kafka offsets are committed even when an > >> > exception is > >> > raised? If so is there a way to commit the offsets only when there are > >> > no > >> > exceptions? > >> > > >> > > >> > > >> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande > >> > <deshpandesh...@gmail.com> > >> > wrote: > >> >> > >> >> Thanks again Cody, > >> >> > >> >> My understanding is all the code inside foreachRDD is running on the > >> >> driver except for > >> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1"). > >> >> > >> >> When the exception is raised, I was thinking I won't be committing > the > >> >> offsets, but the offsets are committed all the time independent of > >> >> whether > >> >> an exception was raised or not. > >> >> > >> >> It will be helpful if you can explain this behavior. > >> >> > >> >> > >> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger <c...@koeninger.org> > >> >> wrote: > >> >>> > >> >>> I mean that the kafka consumers running on the executors should not > be > >> >>> automatically committing, because the fact that a message was read > by > >> >>> the consumer has no bearing on whether it was actually successfully > >> >>> processed after reading. > >> >>> > >> >>> It sounds to me like you're confused about where code is running. > >> >>> foreachRDD runs on the driver, not the executor. > >> >>> > >> >>> > >> >>> > >> >>> http://spark.apache.org/docs/latest/streaming-programming- > guide.html#design-patterns-for-using-foreachrdd > >> >>> > >> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande > >> >>> <deshpandesh...@gmail.com> wrote: > >> >>> > Thanks Cody for your response. > >> >>> > > >> >>> > All I want to do is, commit the offsets only if I am successfully > >> >>> > able > >> >>> > to > >> >>> > write to cassandra database. > >> >>> > > >> >>> > The line //save the rdd to Cassandra database is > >> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1") > >> >>> > > >> >>> > What do you mean by Executors shouldn't be auto-committing, that's > >> >>> > why > >> >>> > it's > >> >>> > being overridden. It is the executors that do the mapping and > saving > >> >>> > to > >> >>> > cassandra. The status of success or failure of this operation is > >> >>> > known > >> >>> > only > >> >>> > on the executor and thats where I want to commit the kafka > offsets. > >> >>> > If > >> >>> > this > >> >>> > is not what I sould be doing, then what is the right way? > >> >>> > > >> >>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger < > c...@koeninger.org> > >> >>> > wrote: > >> >>> >> > >> >>> >> If your complaint is about offsets being committed that you > didn't > >> >>> >> expect... auto commit being false on executors shouldn't have > >> >>> >> anything > >> >>> >> to do with that. Executors shouldn't be auto-committing, that's > >> >>> >> why > >> >>> >> it's being overridden. > >> >>> >> > >> >>> >> What you've said and the code you posted isn't really enough to > >> >>> >> explain what your issue is, e.g. > >> >>> >> > >> >>> >> is this line > >> >>> >> // save the rdd to Cassandra database > >> >>> >> a blocking call > >> >>> >> > >> >>> >> are you sure that the rdd foreach isn't being retried and > >> >>> >> succeeding > >> >>> >> the second time around, etc > >> >>> >> > >> >>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande > >> >>> >> <deshpandesh...@gmail.com> wrote: > >> >>> >> > Hello All, > >> >>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 . > >> >>> >> > > >> >>> >> > I am setting enable.auto.commit to false, and manually want to > >> >>> >> > commit > >> >>> >> > the > >> >>> >> > offsets after my output operation is successful. So when a > >> >>> >> > exception > >> >>> >> > is > >> >>> >> > raised during during the processing I do not want the offsets > to > >> >>> >> > be > >> >>> >> > committed. But looks like the offsets are automatically > committed > >> >>> >> > even > >> >>> >> > when > >> >>> >> > the exception is raised and thereby I am losing data. > >> >>> >> > In my logs I see, WARN overriding enable.auto.commit to false > >> >>> >> > for > >> >>> >> > executor. But I don't want it to override. Please help. > >> >>> >> > > >> >>> >> > My code looks like.. > >> >>> >> > > >> >>> >> > val kafkaParams = Map[String, Object]( > >> >>> >> > "bootstrap.servers" -> brokers, > >> >>> >> > "key.deserializer" -> classOf[StringDeserializer], > >> >>> >> > "value.deserializer" -> classOf[StringDeserializer], > >> >>> >> > "group.id" -> "Group1", > >> >>> >> > "auto.offset.reset" -> offsetresetparameter, > >> >>> >> > "enable.auto.commit" -> (false: java.lang.Boolean) > >> >>> >> > ) > >> >>> >> > > >> >>> >> > val myTopics = Array("topic1") > >> >>> >> > val stream1 = KafkaUtils.createDirectStream[String, > String]( > >> >>> >> > ssc, > >> >>> >> > PreferConsistent, > >> >>> >> > Subscribe[String, String](myTopics, kafkaParams) > >> >>> >> > ) > >> >>> >> > > >> >>> >> > stream1.foreachRDD { (rdd, time) => > >> >>> >> > val offsetRanges = > >> >>> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges > >> >>> >> > try { > >> >>> >> > //save the rdd to Cassandra database > >> >>> >> > > >> >>> >> > > >> >>> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync( > offsetRanges) > >> >>> >> > } catch { > >> >>> >> > case ex: Exception => { > >> >>> >> > println(ex.toString + "!!!!!! Bad Data, Unable to > >> >>> >> > persist > >> >>> >> > into > >> >>> >> > table !!!!!" + errorOffsetRangesToString(offsetRanges)) > >> >>> >> > } > >> >>> >> > } > >> >>> >> > } > >> >>> >> > > >> >>> >> > ssc.start() > >> >>> >> > ssc.awaitTermination() > >> >>> > > >> >>> > > >> >> > >> >> > >> > > > > > >