Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming
Hi Cody, I am able to do using this piece of code kafkaStreamRdd.foreachRDD((rdd,batchMilliSec) -> { Date currentBatchTime = new Date(); currentBatchTime.setTime(batchMilliSec.milliseconds()); List r = new ArrayList(); OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges(); for(int partition = 0; partition < offsetRanges.length; partition++){ //Add offsets to the list } JavaSparkContext ctx = new JavaSparkContext(rdd.context()); JavaRDD currrentBatchOffsets = ctx.parallelize(r); //write currrentBatchOffsets rdd to cassandra return null; }); Is this the correct way of doing this ? Thanks !! Abhi On Tue, Feb 16, 2016 at 9:31 PM, Cody Koeningerwrote: > You could use sc.parallelize... but the offsets are already available at > the driver, and they're a (hopefully) small enough amount of data that's > it's probably more straightforward to just use the normal cassandra client > to save them from the driver. > > On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand > wrote: > >> I have a kafka rdd and I need to save the offsets to cassandra table at >> the begining of each batch. >> >> Basically I need to write the offsets of the type Offsets below that I am >> getting inside foreachRD, to cassandra. The javafunctions api to write to >> cassandra needs a rdd. How can I create a rdd from offsets and write to >> cassandra table. >> >> >> public static void writeOffsets(JavaPairDStream > String> kafkastream){ >> kafkastream.foreachRDD((rdd,batchMilliSec) -> { >> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); >> return null; >> }); >> >> >> Thanks !! >> Abhi >> >> >> >
Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming
You could use the "withSessionDo" of the SparkCassandrConnector to preform the simple insert: CassandraConnector(conf).withSessionDo { session => session.execute() } -Todd On Tue, Feb 16, 2016 at 11:01 AM, Cody Koeningerwrote: > You could use sc.parallelize... but the offsets are already available at > the driver, and they're a (hopefully) small enough amount of data that's > it's probably more straightforward to just use the normal cassandra client > to save them from the driver. > > On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand > wrote: > >> I have a kafka rdd and I need to save the offsets to cassandra table at >> the begining of each batch. >> >> Basically I need to write the offsets of the type Offsets below that I am >> getting inside foreachRD, to cassandra. The javafunctions api to write to >> cassandra needs a rdd. How can I create a rdd from offsets and write to >> cassandra table. >> >> >> public static void writeOffsets(JavaPairDStream > String> kafkastream){ >> kafkastream.foreachRDD((rdd,batchMilliSec) -> { >> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); >> return null; >> }); >> >> >> Thanks !! >> Abhi >> >> >> >
Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming
You could use sc.parallelize... but the offsets are already available at the driver, and they're a (hopefully) small enough amount of data that's it's probably more straightforward to just use the normal cassandra client to save them from the driver. On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anandwrote: > I have a kafka rdd and I need to save the offsets to cassandra table at > the begining of each batch. > > Basically I need to write the offsets of the type Offsets below that I am > getting inside foreachRD, to cassandra. The javafunctions api to write to > cassandra needs a rdd. How can I create a rdd from offsets and write to > cassandra table. > > > public static void writeOffsets(JavaPairDStream String> kafkastream){ > kafkastream.foreachRDD((rdd,batchMilliSec) -> { > OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); > return null; > }); > > > Thanks !! > Abhi > > >
Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming
I have a kafka rdd and I need to save the offsets to cassandra table at the begining of each batch. Basically I need to write the offsets of the type Offsets below that I am getting inside foreachRD, to cassandra. The javafunctions api to write to cassandra needs a rdd. How can I create a rdd from offsets and write to cassandra table. public static void writeOffsets(JavaPairDStreamkafkastream){ kafkastream.foreachRDD((rdd,batchMilliSec) -> { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); return null; }); Thanks !! Abhi