Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

2016-02-16 Thread Abhishek Anand
Hi Cody, I am able to do using this piece of code kafkaStreamRdd.foreachRDD((rdd,batchMilliSec) -> { Date currentBatchTime = new Date(); currentBatchTime.setTime(batchMilliSec.milliseconds()); List r = new ArrayList(); OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();

Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

2016-02-16 Thread Todd Nist
You could use the "withSessionDo" of the SparkCassandrConnector to preform the simple insert: CassandraConnector(conf).withSessionDo { session => session.execute() } -Todd On Tue, Feb 16, 2016 at 11:01 AM, Cody Koeninger wrote: > You could use sc.parallelize... but the

Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

2016-02-16 Thread Cody Koeninger
You could use sc.parallelize... but the offsets are already available at the driver, and they're a (hopefully) small enough amount of data that's it's probably more straightforward to just use the normal cassandra client to save them from the driver. On Tue, Feb 16, 2016 at 1:15 AM, Abhishek

Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

2016-02-15 Thread Abhishek Anand
I have a kafka rdd and I need to save the offsets to cassandra table at the begining of each batch. Basically I need to write the offsets of the type Offsets below that I am getting inside foreachRD, to cassandra. The javafunctions api to write to cassandra needs a rdd. How can I create a rdd