You can just do

You can just do something like this, the Spark Cassandra Connector handles the 
rest

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, Map(KafkaTopicRaw -> 10), StorageLevel.DISK_ONLY_2)
  .map { case (_, line) => line.split(",")}
  .map(RawWeatherData(_)) 
  .saveToCassandra(CassandraKeyspace, CassandraTableRaw)
- Helena
@helenaedelson

On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.com wrote:

> Hi,
> 
> I have written the code below which is streaming data from kafka, and 
> printing to the console.
> I want to extend this, and want my data to go into Cassandra table instead.
> 
> JavaStreamingContext jssc = new JavaStreamingContext("local[4]", 
> "SparkStream", new Duration(1000));
> JavaPairReceiverInputDStream<String, String> messages = 
> KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
> 
> System.out.println("Connection done!");
> JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, 
> String>() 
> {
> public String call(Tuple2<String, String> message)
> {
> return message._2();
> }
> }
> );
> //data.print();   --> output to console
> data.foreachRDD(saveToCassandra("mykeyspace","mytable"));
> jssc.start();
> jssc.awaitTermination();
> 
> 
> How should I implement the line:
> data.foreachRDD(saveToCassandra("mykeyspace","mytable"));​
> so that data goes into Cassandra, in each batch.  And how do I specify a 
> batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing 
> will be entered into cassandra for sure since it is getting killed.
> 
> Please help.
> 
> Thanks and Regards,
> 
> Md. Aiman Sarosh.
> Accenture Services Pvt. Ltd.
> Mob #:  (+91) - 9836112841.
> 
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Where allowed by local law, 
> electronic communications with Accenture and its affiliates, including e-mail 
> and instant messaging (including content), may be scanned by our systems for 
> the purposes of information security and assessment of internal compliance 
> with Accenture policy. 
> ______________________________________________________________________________________
> 
> www.accenture.com

Reply via email to