Hi Cody,
I am able to do using this piece of code
kafkaStreamRdd.foreachRDD((rdd,batchMilliSec) -> {
Date currentBatchTime = new Date();
currentBatchTime.setTime(batchMilliSec.milliseconds());
List r = new ArrayList();
OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
You could use the "withSessionDo" of the SparkCassandrConnector to preform
the simple insert:
CassandraConnector(conf).withSessionDo { session => session.execute() }
-Todd
On Tue, Feb 16, 2016 at 11:01 AM, Cody Koeninger wrote:
> You could use sc.parallelize... but the
You could use sc.parallelize... but the offsets are already available at
the driver, and they're a (hopefully) small enough amount of data that's
it's probably more straightforward to just use the normal cassandra client
to save them from the driver.
On Tue, Feb 16, 2016 at 1:15 AM, Abhishek
I have a kafka rdd and I need to save the offsets to cassandra table at the
begining of each batch.
Basically I need to write the offsets of the type Offsets below that I am
getting inside foreachRD, to cassandra. The javafunctions api to write to
cassandra needs a rdd. How can I create a rdd