Re: Spark-Streaming: output to cassandra
Here's an example of a Cassandra etl that you can follow which should exit on its own. I'm using it as a blueprint for revolving spark streaming apps on top of. For me, I kill the streaming app w system.exit after a sufficient amount of data is collected. That seems to work for most any scenario... But you I guess could also kill on the stream handler side as well if you are writing a custom dstream. https://github.com/jayunit100/SparkBlueprint/blob/master/src/main/scala/sparkapps/tweetstream/Processor.scala On Dec 5, 2014, at 1:50 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Batch is the batch duration that you are specifying while creating the StreamingContext, so at the end of every batch's computation the data will get flushed to Cassandra, and why are you stopping your program with Ctrl + C? You can always specify the time with the sc.awaitTermination(Duration) Thanks Best Regards On Fri, Dec 5, 2014 at 11:53 AM, m.sar...@accenture.com wrote: Hi Gerard/Akhil, By how do I specify a batch I was trying to ask that when does the data in the JavaDStream gets flushed into Cassandra table?. I read somewhere that the streaming data in batches gets written in Cassandra. This batch can be of some particular time, or one particular run. That was what I was trying to understand, how to set that Batch in my program. Because if a batch means one cycle run of my streaming app, then in my app, I'm hitting a Ctrl+C to kill the program. So the program is terminating, and would the data get inserted successfully into my Cassandra table? For example, in Terminal-A I'm running Kafka-producer to stream-in messages. Terminal-B I'm running my Streaming App. In my App there is a line jssc.awaitTermination(); which will keep running my App till I kill it. Eventually I am hitting Ctrl+C in my App terminal, i.e. Terminal-B and killing it. So its a kind of ungraceful termination. So in this case will the data in my App DStream get written into Cassandra? Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Gerard Maas gerard.m...@gmail.com Sent: Thursday, December 4, 2014 10:22 PM To: Akhil Das Cc: Sarosh, M.; user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra I guess he's already doing so, given the 'saveToCassandra' usage. What I don't understand is the question how do I specify a batch. That doesn't make much sense to me. Could you explain further? -kr, Gerard. On Thu, Dec 4, 2014 at 5:36 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use the datastax's Cassandra connector. Thanks Best Regards On Thu, Dec 4, 2014 at 8:21 PM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Re: Spark-Streaming: output to cassandra
You can just do You can just do something like this, the Spark Cassandra Connector handles the rest KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(KafkaTopicRaw - 10), StorageLevel.DISK_ONLY_2) .map { case (_, line) = line.split(,)} .map(RawWeatherData(_)) .saveToCassandra(CassandraKeyspace, CassandraTableRaw) - Helena @helenaedelson On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Re: Spark-Streaming: output to cassandra
Hi Akhil, Vyas, Helena, Thank you for your suggestions. As Akhil suggested earlier, i have implemented the batch Duration into JavaStreamingContext and waitForTermination(Duration). The approach Helena suggested is Scala oriented. But the issue now is that I want to set Cassandra as my output. I have created a table in cassandra test_table with columns key:text primary key and value:text I have mapped the data successfully into JavaDStreamTuple2String,String data : JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); JavaDStreamTuple2String,String data = messages.map(new Function Tuple2String,String, Tuple2String,String () { public Tuple2String,String call(Tuple2String, String message) { return new Tuple2String,String( message._1(), message._2() ); } } ); Then I have created a List: ListTestTable list = new ArrayListTestTable(); where TestTable is my custom class having the same structure as my Cassandra table, with members key and value: class TestTable { String key; String val; public TestTable() {} public TestTable(String k, String v) { key=k; val=v; } public String getKey(){ return key; } public void setKey(String k){ key=k; } public String getVal(){ return val; } public void setVal(String v){ val=v; } public String toString(){ return Key:+key+,Val:+val; } } Please suggest a way how to I add the data from JavaDStreamTuple2String,String data into the ListTestTable list. I am doing this so that I can subsequently use JavaRDDTestTable rdd = sc.parallelize(list); javaFunctions(rdd, TestTable.class).saveToCassandra(testkeyspace, test_table); to save the RDD data into Cassandra. I had tried coding this way: messages.foreachRDD(new FunctionTuple2String,String, String() { public ListTestTable call(Tuple2String,String message) { String k = message._1(); String v = message._2(); TestTable tbl = new TestTable(k,v); list.put(tbl); } } ); but seems some type mis-match happenning. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Helena Edelson helena.edel...@datastax.com Sent: Friday, December 5, 2014 6:26 PM To: Sarosh, M. Cc: user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra You can just do You can just do something like this, the Spark Cassandra Connector handles the rest KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(KafkaTopicRaw - 10), StorageLevel.DISK_ONLY_2) .map { case (_, line) = line.split(,)} .map(RawWeatherData(_)) .saveToCassandra(CassandraKeyspace, CassandraTableRaw) - Helena @helenaedelson On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.commailto:m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable));? so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.comhttp://www.accenture.com
Re: Spark-Streaming: output to cassandra
I think what you are looking for is something like: JavaRDDDouble pricesRDD = javaFunctions(sc).cassandraTable(ks, tab, mapColumnTo(Double.class)).select(price); JavaRDDPerson rdd = javaFunctions(sc).cassandraTable(ks, people, mapRowTo(Person.class)); noted here: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md ? - Helena @helenaedelson On Dec 5, 2014, at 10:15 AM, m.sar...@accenture.com m.sar...@accenture.com wrote: Hi Akhil, Vyas, Helena, Thank you for your suggestions. As Akhil suggested earlier, i have implemented the batch Duration into JavaStreamingContext and waitForTermination(Duration). The approach Helena suggested is Scala oriented. But the issue now is that I want to set Cassandra as my output. I have created a table in cassandra test_table with columns key:text primary key and value:text I have mapped the data successfully into JavaDStreamTuple2String,String data : JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); JavaDStreamTuple2String,String data = messages.map(new Function Tuple2String,String, Tuple2String,String () { public Tuple2String,String call(Tuple2String, String message) { return new Tuple2String,String( message._1(), message._2() ); } } ); Then I have created a List: ListTestTable list = new ArrayListTestTable(); where TestTable is my custom class having the same structure as my Cassandra table, with members key and value: class TestTable { String key; String val; public TestTable() {} public TestTable(String k, String v) { key=k; val=v; } public String getKey(){ return key; } public void setKey(String k){ key=k; } public String getVal(){ return val; } public void setVal(String v){ val=v; } public String toString(){ return Key:+key+,Val:+val; } } Please suggest a way how to I add the data from JavaDStreamTuple2String,String data into the ListTestTable list. I am doing this so that I can subsequently use JavaRDDTestTable rdd = sc.parallelize(list); javaFunctions(rdd, TestTable.class).saveToCassandra(testkeyspace, test_table); to save the RDD data into Cassandra. I had tried coding this way: messages.foreachRDD(new FunctionTuple2String,String, String() { public ListTestTable call(Tuple2String,String message) { String k = message._1(); String v = message._2(); TestTable tbl = new TestTable(k,v); list.put(tbl); } } ); but seems some type mis-match happenning. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Helena Edelson helena.edel...@datastax.com Sent: Friday, December 5, 2014 6:26 PM To: Sarosh, M. Cc: user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra You can just do You can just do something like this, the Spark Cassandra Connector handles the rest KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(KafkaTopicRaw - 10), StorageLevel.DISK_ONLY_2) .map { case (_, line) = line.split(,)} .map(RawWeatherData(_)) .saveToCassandra(CassandraKeyspace, CassandraTableRaw) - Helena @helenaedelson On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications
Re: Spark-Streaming: output to cassandra
Thank you Helena, But I would like to explain my problem space: The output is supposed to be Cassandra. To achieve that, I have to use spark-cassandra-connecter APIs. So going in a botton-up approach, to write to cassandra, I have to use: javaFunctions(JavaRDD object rdd, TestTable.class).saveToCassandra(testkeyspace, test_table); To use the above function javaFunctions, I need to obtain the JavaRDD object, using the sc.parallelize() like this: JavaRDDTestTable rdd = sc.parallelize(list); The above sc.parallelize(list) accepts List object as a parameter. The above List object will contain the data obtained either from JavaDStream or JavaPairReceiverDStream, that has the streaming data. So the flow is: I need: 1. JavaDStream data to get mapped into List. 2. The above List object to be passed to sc.parallelize(list) to obtain JavaRDD object. 3. The above JavaRDD object to be passed to javaFunctions().saveToCassandra(). For all this I need a code that maps my JavaDStream data into List. Once step 1 is done, steps 2 and 3 can easily be performed. I need help for step 1. I have written the code below to do it: JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); /* connection to cassandra */ CassandraConnector connector = CassandraConnector.apply(sc.getConf()); Session session = connector.openSession(); session.execute(CREATE TABLE testkeyspace.test_table (key TEXT PRIMARY KEY, value TEXT)); ListTestTable list = new ArrayListTestTable(); messages.foreachRDD(new FunctionTuple2String,String, Void() { public Void call(Tuple2String,String message) { String k = message._1(); String v = message._2(); TestTable tbl = new TestTable(k,v); list.add(tbl); return null; } } jssc.start(); jssc.awaitTermination(new Duration(60* 1000)); It would be great help if a way is suggested to map the JavaDStream to List. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Helena Edelson helena.edel...@datastax.com Sent: Friday, December 5, 2014 9:12 PM To: Sarosh, M. Cc: user Subject: Re: Spark-Streaming: output to cassandra I think what you are looking for is something like: JavaRDDDouble pricesRDD = javaFunctions(sc).cassandraTable(ks, tab, mapColumnTo(Double.class)).select(price); JavaRDDPerson rdd = javaFunctions(sc).cassandraTable(ks, people, mapRowTo(Person.class)); noted here: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md ? - Helena @helenaedelson On Dec 5, 2014, at 10:15 AM, m.sar...@accenture.commailto:m.sar...@accenture.com m.sar...@accenture.commailto:m.sar...@accenture.com wrote: Hi Akhil, Vyas, Helena, Thank you for your suggestions. As Akhil suggested earlier, i have implemented the batch Duration into JavaStreamingContext and waitForTermination(Duration). The approach Helena suggested is Scala oriented. But the issue now is that I want to set Cassandra as my output. I have created a table in cassandra test_table with columns key:text primary key and value:text I have mapped the data successfully into JavaDStreamTuple2String,String data : JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); JavaDStreamTuple2String,String data = messages.map(new Function Tuple2String,String, Tuple2String,String () { public Tuple2String,String call(Tuple2String, String message) { return new Tuple2String,String( message._1(), message._2() ); } } ); Then I have created a List: ListTestTable list = new ArrayListTestTable(); where TestTable is my custom class having the same structure as my Cassandra table, with members key and value: class TestTable { String key; String val; public TestTable() {} public TestTable(String k, String v) { key=k; val=v; } public String getKey(){ return key; } public void setKey(String k){ key=k; } public String getVal(){ return val; } public void setVal(String v){ val=v; } public String toString(){ return Key:+key+,Val:+val; } } Please suggest a way how to I add the data from JavaDStreamTuple2String,String data into the ListTestTable list. I am doing this so that I can subsequently use JavaRDDTestTable rdd = sc.parallelize(list); javaFunctions(rdd, TestTable.class).saveToCassandra(testkeyspace, test_table); to save the RDD data into Cassandra. I had tried coding this way: messages.foreachRDD(new FunctionTuple2String,String, String() { public ListTestTable call(Tuple2String,String message) { String k = message._1(); String v = message._2(); TestTable tbl = new
Re: Spark-Streaming: output to cassandra
You can use the datastax's Cassandra connector. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Thanks Best Regards On Thu, Dec 4, 2014 at 8:21 PM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, *Md. Aiman Sarosh.* Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. -- This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Re: Spark-Streaming: output to cassandra
I guess he's already doing so, given the 'saveToCassandra' usage. What I don't understand is the question how do I specify a batch. That doesn't make much sense to me. Could you explain further? -kr, Gerard. On Thu, Dec 4, 2014 at 5:36 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use the datastax's Cassandra connector. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Thanks Best Regards On Thu, Dec 4, 2014 at 8:21 PM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, *Md. Aiman Sarosh.* Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. -- This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Re: Spark-Streaming: output to cassandra
Hi Gerard/Akhil, By how do I specify a batch I was trying to ask that when does the data in the JavaDStream gets flushed into Cassandra table?. I read somewhere that the streaming data in batches gets written in Cassandra. This batch can be of some particular time, or one particular run. That was what I was trying to understand, how to set that Batch in my program. Because if a batch means one cycle run of my streaming app, then in my app, I'm hitting a Ctrl+C to kill the program. So the program is terminating, and would the data get inserted successfully into my Cassandra table? For example, in Terminal-A I'm running Kafka-producer to stream-in messages. Terminal-B I'm running my Streaming App. In my App there is a line jssc.awaitTermination();? which will keep running my App till I kill it. Eventually I am hitting Ctrl+C in my App terminal, i.e. Terminal-B and killing it. So its a kind of ungraceful termination. So in this case will the data in my App DStream get written into Cassandra? Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Gerard Maas gerard.m...@gmail.com Sent: Thursday, December 4, 2014 10:22 PM To: Akhil Das Cc: Sarosh, M.; user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra I guess he's already doing so, given the 'saveToCassandra' usage. What I don't understand is the question how do I specify a batch. That doesn't make much sense to me. Could you explain further? -kr, Gerard. On Thu, Dec 4, 2014 at 5:36 PM, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: You can use the datastax's Cassandra connector.https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Thanks Best Regards On Thu, Dec 4, 2014 at 8:21 PM, m.sar...@accenture.commailto:m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable));? so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841tel:%28%2B91%29%20-%209836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.comhttp://www.accenture.com
Re: Spark-Streaming: output to cassandra
Batch is the batch duration that you are specifying while creating the StreamingContext, so at the end of every batch's computation the data will get flushed to Cassandra, and why are you stopping your program with Ctrl + C? You can always specify the time with the sc.awaitTermination(Duration) Thanks Best Regards On Fri, Dec 5, 2014 at 11:53 AM, m.sar...@accenture.com wrote: Hi Gerard/Akhil, By how do I specify a batch I was trying to ask that when does the data in the JavaDStream gets flushed into Cassandra table?. I read somewhere that the streaming data in batches gets written in Cassandra. This batch can be of some particular time, or one particular run. That was what I was trying to understand, how to set that Batch in my program. Because if a batch means one cycle run of my streaming app, then in my app, I'm hitting a Ctrl+C to kill the program. So the program is terminating, and would the data get inserted successfully into my Cassandra table? For example, in Terminal-A I'm running Kafka-producer to stream-in messages. Terminal-B I'm running my Streaming App. In my App there is a line jssc.awaitTermination(); which will keep running my App till I kill it. Eventually I am hitting Ctrl+C in my App terminal, i.e. Terminal-B and killing it. So its a kind of ungraceful termination. So in this case will the data in my App DStream get written into Cassandra? Thanks and Regards, *Md. Aiman Sarosh.* Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. -- *From:* Gerard Maas gerard.m...@gmail.com *Sent:* Thursday, December 4, 2014 10:22 PM *To:* Akhil Das *Cc:* Sarosh, M.; user@spark.apache.org *Subject:* Re: Spark-Streaming: output to cassandra I guess he's already doing so, given the 'saveToCassandra' usage. What I don't understand is the question how do I specify a batch. That doesn't make much sense to me. Could you explain further? -kr, Gerard. On Thu, Dec 4, 2014 at 5:36 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use the datastax's Cassandra connector. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Thanks Best Regards On Thu, Dec 4, 2014 at 8:21 PM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, *Md. Aiman Sarosh.* Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. -- This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com