Hi Javier, I will try to implement this in scala then. As far as I can see in the documentation there is no SaveToCassandra in the python interface unless you are working with dataframes and the kafkaStream instance does not provide methods to convert an RDD into DF.
Regarding my table, it is very simple (see below). Can I change something to make it write faster? CREATE TABLE test_hdpkns.measurement ( mid bigint, tt timestamp, in_tt timestamp, out_tt timestamp, sensor_id int, measure double, PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt) ) with compact storage; The system CPU while the demo is running is almost always at 100% for both cores. Thank you. Best Regards, On 29/04/2018 20:46:30, Javier Pareja <pareja.jav...@gmail.com> wrote: Hi Saulo, I meant using this to save: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream But it might be slow on a different area. Another point is that Cassandra and spark running on the same machine might compete for resources which will slow down the insert. You can check the CPU usage of the machine at the time. Also the design of the table schema can make a big difference. On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <saulo.sobre...@outlook.pt<mailto:saulo.sobre...@outlook.pt>> wrote: Hi Javier, I removed the map and used "map" directly instead of using transform, but the kafkaStream is created with KafkaUtils which does not have a method to save to cassandra directly. Do you know any workarround for this? Thank you for the suggestion. Best Regards, On 29/04/2018 17:03:24, Javier Pareja <pareja.jav...@gmail.com<mailto:pareja.jav...@gmail.com>> wrote: Hi Saulo, I'm no expert but I will give it a try. I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map. I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD. Finally I would use the spark UI to find which stage is the bottleneck here. On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <saulo.sobre...@outlook.pt<mailto:saulo.sobre...@outlook.pt>> wrote: Hi all, I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra. Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs. I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1). I would like to know you have some suggestion to improve performance (other than getting more resources :) ). My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark (recommended in stackoverflow for similar questions). Thank you in advance, Best Regards, Saulo =============== # CODE # ================================= #### # run command: # spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2 --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2 ## # Run Spark imports from pyspark import SparkConf # SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils # Run Cassandra imports import pyspark_cassandra from pyspark_cassandra import CassandraSparkContext, saveToCassandra def recordHandler(record): (mid, tt, in_tt, sid, mv) = parseData( record ) return processMetrics(mid, tt, in_tt, sid, mv) def process(time, rdd): rdd2 = rdd.map( lambda w: recordHandler(w[1]) ) if rdd2.count() > 0: return rdd2 def casssave(time, rdd): rdd.saveToCassandra( "test_hdpkns", "measurement" ) # ... brokers, topic = sys.argv[1:] # ... sconf = SparkConf() \ .setAppName("SensorDataStreamHandler") \ .setMaster("local[*]") \ .set("spark.default.parallelism", "2") sc = CassandraSparkContext(conf = sconf) batchIntervalSeconds = 2 ssc = StreamingContext(sc, batchIntervalSeconds) kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) kafkaStream \ .transform(process) \ .foreachRDD(casssave) ssc.start() ssc.awaitTermination() ================================================