I am doing that already for all known messy data. Thanks Cody for all your
time and input

On Mon, Aug 7, 2017 at 11:58 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Yes
>
> On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande
> <deshpandesh...@gmail.com> wrote:
> > Thanks Cody again.
> >
> > No. I am doing mapping of the Kafka ConsumerRecord to be able to save it
> in
> > the Cassandra table and saveToCassandra  is an action and my data do get
> > saved into Cassandra. It is working as expected 99% of the time except
> that
> > when there is an exception, I did not want the offsets to be committed.
> >
> > By Filtering for unsuccessful attempts, do you mean filtering the bad
> > records...
> >
> >
> >
> >
> >
> >
> > On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> If literally all you are doing is rdd.map I wouldn't expect
> >> saveToCassandra to happen at all, since map is not an action.
> >>
> >> Filtering for unsuccessful attempts and collecting those back to the
> >> driver would be one way for the driver to know whether it was safe to
> >> commit.
> >>
> >> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
> >> <deshpandesh...@gmail.com> wrote:
> >> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  -->
> is
> >> > running on executor
> >> >
> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) -->
> is
> >> > running on driver.
> >> >
> >> > Is this the reason why kafka offsets are committed even when an
> >> > exception is
> >> > raised? If so is there a way to commit the offsets only when there are
> >> > no
> >> > exceptions?
> >> >
> >> >
> >> >
> >> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande
> >> > <deshpandesh...@gmail.com>
> >> > wrote:
> >> >>
> >> >> Thanks again Cody,
> >> >>
> >> >> My understanding is all the code inside foreachRDD is running on the
> >> >> driver except for
> >> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
> >> >>
> >> >> When the exception is raised, I was thinking I won't be committing
> the
> >> >> offsets, but the offsets are committed all the time independent of
> >> >> whether
> >> >> an exception was raised or not.
> >> >>
> >> >> It will be helpful if you can explain this behavior.
> >> >>
> >> >>
> >> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger <c...@koeninger.org>
> >> >> wrote:
> >> >>>
> >> >>> I mean that the kafka consumers running on the executors should not
> be
> >> >>> automatically committing, because the fact that a message was read
> by
> >> >>> the consumer has no bearing on whether it was actually successfully
> >> >>> processed after reading.
> >> >>>
> >> >>> It sounds to me like you're confused about where code is running.
> >> >>> foreachRDD runs on the driver, not the executor.
> >> >>>
> >> >>>
> >> >>>
> >> >>> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#design-patterns-for-using-foreachrdd
> >> >>>
> >> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
> >> >>> <deshpandesh...@gmail.com> wrote:
> >> >>> > Thanks Cody for your response.
> >> >>> >
> >> >>> > All I want to do is, commit the offsets only if I am successfully
> >> >>> > able
> >> >>> > to
> >> >>> > write to cassandra database.
> >> >>> >
> >> >>> > The line //save the rdd to Cassandra database is
> >> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
> >> >>> >
> >> >>> > What do you mean by Executors shouldn't be auto-committing, that's
> >> >>> > why
> >> >>> > it's
> >> >>> > being overridden. It is the executors that do the mapping and
> saving
> >> >>> > to
> >> >>> > cassandra. The status of success or failure of this operation is
> >> >>> > known
> >> >>> > only
> >> >>> > on the executor and thats where I want to commit the kafka
> offsets.
> >> >>> > If
> >> >>> > this
> >> >>> > is not what I sould be doing, then  what is the right way?
> >> >>> >
> >> >>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger <
> c...@koeninger.org>
> >> >>> > wrote:
> >> >>> >>
> >> >>> >> If your complaint is about offsets being committed that you
> didn't
> >> >>> >> expect... auto commit being false on executors shouldn't have
> >> >>> >> anything
> >> >>> >> to do with that.  Executors shouldn't be auto-committing, that's
> >> >>> >> why
> >> >>> >> it's being overridden.
> >> >>> >>
> >> >>> >> What you've said and the code you posted isn't really enough to
> >> >>> >> explain what your issue is, e.g.
> >> >>> >>
> >> >>> >> is this line
> >> >>> >> // save the rdd to Cassandra database
> >> >>> >> a blocking call
> >> >>> >>
> >> >>> >> are you sure that the rdd foreach isn't being retried and
> >> >>> >> succeeding
> >> >>> >> the second time around, etc
> >> >>> >>
> >> >>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
> >> >>> >> <deshpandesh...@gmail.com> wrote:
> >> >>> >> > Hello All,
> >> >>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
> >> >>> >> >
> >> >>> >> > I am setting enable.auto.commit to false, and manually want to
> >> >>> >> > commit
> >> >>> >> > the
> >> >>> >> > offsets after my output operation is successful. So when a
> >> >>> >> > exception
> >> >>> >> > is
> >> >>> >> > raised during during the processing I do not want the offsets
> to
> >> >>> >> > be
> >> >>> >> > committed. But looks like the offsets are automatically
> committed
> >> >>> >> > even
> >> >>> >> > when
> >> >>> >> > the exception is raised and thereby I am losing data.
> >> >>> >> > In my logs I see,  WARN  overriding enable.auto.commit to false
> >> >>> >> > for
> >> >>> >> > executor.  But I don't want it to override. Please help.
> >> >>> >> >
> >> >>> >> > My code looks like..
> >> >>> >> >
> >> >>> >> >     val kafkaParams = Map[String, Object](
> >> >>> >> >       "bootstrap.servers" -> brokers,
> >> >>> >> >       "key.deserializer" -> classOf[StringDeserializer],
> >> >>> >> >       "value.deserializer" -> classOf[StringDeserializer],
> >> >>> >> >       "group.id" -> "Group1",
> >> >>> >> >       "auto.offset.reset" -> offsetresetparameter,
> >> >>> >> >       "enable.auto.commit" -> (false: java.lang.Boolean)
> >> >>> >> >     )
> >> >>> >> >
> >> >>> >> >     val myTopics = Array("topic1")
> >> >>> >> >     val stream1 = KafkaUtils.createDirectStream[String,
> String](
> >> >>> >> >       ssc,
> >> >>> >> >       PreferConsistent,
> >> >>> >> >       Subscribe[String, String](myTopics, kafkaParams)
> >> >>> >> >     )
> >> >>> >> >
> >> >>> >> >     stream1.foreachRDD { (rdd, time) =>
> >> >>> >> >         val offsetRanges =
> >> >>> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >> >>> >> >         try {
> >> >>> >> >             //save the rdd to Cassandra database
> >> >>> >> >
> >> >>> >> >
> >> >>> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(
> offsetRanges)
> >> >>> >> >         } catch {
> >> >>> >> >           case ex: Exception => {
> >> >>> >> >             println(ex.toString + "!!!!!! Bad Data, Unable to
> >> >>> >> > persist
> >> >>> >> > into
> >> >>> >> > table !!!!!" + errorOffsetRangesToString(offsetRanges))
> >> >>> >> >           }
> >> >>> >> >         }
> >> >>> >> >     }
> >> >>> >> >
> >> >>> >> >     ssc.start()
> >> >>> >> >     ssc.awaitTermination()
> >> >>> >
> >> >>> >
> >> >>
> >> >>
> >> >
> >
> >
>

Reply via email to