Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread shyla deshpande
I am doing that already for all known messy data. Thanks Cody for all your time and input On Mon, Aug 7, 2017 at 11:58 AM, Cody Koeninger wrote: > Yes > > On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande > wrote: > > Thanks Cody again. > > > > No.

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread Cody Koeninger
Yes On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande wrote: > Thanks Cody again. > > No. I am doing mapping of the Kafka ConsumerRecord to be able to save it in > the Cassandra table and saveToCassandra is an action and my data do get > saved into Cassandra. It is

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread shyla deshpande
Thanks Cody again. No. I am doing mapping of the Kafka ConsumerRecord to be able to save it in the Cassandra table and saveToCassandra is an action and my data do get saved into Cassandra. It is working as expected 99% of the time except that when there is an exception, I did not want the

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread Cody Koeninger
If literally all you are doing is rdd.map I wouldn't expect saveToCassandra to happen at all, since map is not an action. Filtering for unsuccessful attempts and collecting those back to the driver would be one way for the driver to know whether it was safe to commit. On Mon, Aug 7, 2017 at

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread shyla deshpande
rdd.map { record => ()}.saveToCassandra("keyspace1", "table1") --> is running on executor stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> is running on driver. Is this the reason why kafka offsets are committed even when an exception is raised? If so is there a way to

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread shyla deshpande
Thanks again Cody, My understanding is all the code inside foreachRDD is running on the driver except for rdd.map { record => ()}.saveToCassandra("keyspace1", "table1"). When the exception is raised, I was thinking I won't be committing the offsets, but the offsets are committed all the time

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread Cody Koeninger
I mean that the kafka consumers running on the executors should not be automatically committing, because the fact that a message was read by the consumer has no bearing on whether it was actually successfully processed after reading. It sounds to me like you're confused about where code is

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread shyla deshpande
Thanks Cody for your response. All I want to do is, commit the offsets only if I am successfully able to write to cassandra database. The line //save the rdd to Cassandra database is rdd.map { record => ()}.saveToCassandra("kayspace1", "table1") What do you mean by Executors shouldn't be

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread Cody Koeninger
If your complaint is about offsets being committed that you didn't expect... auto commit being false on executors shouldn't have anything to do with that. Executors shouldn't be auto-committing, that's why it's being overridden. What you've said and the code you posted isn't really enough to

kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-05 Thread shyla deshpande
Hello All, I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 . I am setting enable.auto.commit to false, and manually want to commit the offsets after my output operation is successful. So when a exception is raised during during the processing I do not want the offsets to be committed.