Hi Saulo, If the CPU is close to 100% then you are hitting the limit. I don't think that moving to Scala will make a difference. Both Spark and Cassandra are CPU hungry, your setup is small in terms of CPUs. Try running Spark on another (physical) machine so that the 2 cores are dedicated to Cassandra.
Kind Regards Javier On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <saulo.sobre...@outlook.pt> wrote: > Hi Javier, > > I will try to implement this in scala then. As far as I can see in the > documentation there is no SaveToCassandra in the python interface unless > you are working with dataframes and the kafkaStream instance does not > provide methods to convert an RDD into DF. > > Regarding my table, it is very simple (see below). Can I change something > to make it write faster? > CREATE TABLE test_hdpkns.measurement ( > mid bigint, > tt timestamp, > in_tt timestamp, > out_tt timestamp, > sensor_id int, > measure double, > PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt) > ) with compact storage; > > The system CPU while the demo is running is almost always at 100% for both > cores. > > > Thank you. > > Best Regards, > > On 29/04/2018 20:46:30, Javier Pareja <pareja.jav...@gmail.com> wrote: > Hi Saulo, > > I meant using this to save: > > https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream > > But it might be slow on a different area. > Another point is that Cassandra and spark running on the same machine > might compete for resources which will slow down the insert. You can check > the CPU usage of the machine at the time. Also the design of the table > schema can make a big difference. > > > On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <saulo.sobre...@outlook.pt> > wrote: > >> Hi Javier, >> >> >> I removed the map and used "map" directly instead of using transform, but >> the *kafkaStream* is created with KafkaUtils which does not have a >> method to save to cassandra directly. >> >> Do you know any workarround for this? >> >> >> Thank you for the suggestion. >> >> Best Regards, >> >> On 29/04/2018 17:03:24, Javier Pareja <pareja.jav...@gmail.com> wrote: >> Hi Saulo, >> >> I'm no expert but I will give it a try. >> I would remove the rdd2.count(), I can't see the point and you will gain >> performance right away. Because of this, I would not use a transform, just >> directly the map. >> I have not used python but in Scala the cassandra-spark connector can >> save directly to Cassandra without a foreachRDD. >> >> Finally I would use the spark UI to find which stage is the bottleneck >> here. >> >> On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <saulo.sobre...@outlook.pt> >> wrote: >> >>> Hi all, >>> >>> I am implementing a use case where I read some sensor data from Kafka >>> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and, >>> after some transformations, write the output (RDD) to Cassandra. >>> >>> Everything is working properly but I am having some trouble with the >>> performance. My kafka topic receives around 2000 messages per second. For a >>> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to >>> Cassandra, which is not acceptable for longer runs. >>> >>> I am running this application in a "sandbox" with 12GB of RAM, 2 cores >>> and 30GB SSD space. >>> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1). >>> >>> I would like to know you have some suggestion to improve performance >>> (other than getting more resources :) ). >>> >>> My code (pyspark) is posted in the end of this email so you can take a >>> look. I tried some different cassandra configurations following this link: >>> http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark >>> (recommended in stackoverflow for similar questions). >>> >>> >>> Thank you in advance, >>> >>> Best Regards, >>> Saulo >>> >>> >>> >>> =============== # CODE # ================================= >>> #### >>> # run command: >>> # spark2-submit --packages >>> org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2 >>> --conf spark.cassandra.connection.host='localhost' --num-executors 2 >>> --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2 >>> ## >>> >>> # Run Spark imports >>> from pyspark import SparkConf # SparkContext, SparkConf >>> from pyspark.streaming import StreamingContext >>> from pyspark.streaming.kafka import KafkaUtils >>> >>> # Run Cassandra imports >>> import pyspark_cassandra >>> from pyspark_cassandra import CassandraSparkContext, saveToCassandra >>> >>> def recordHandler(record): >>> (mid, tt, in_tt, sid, mv) = parseData( record ) >>> return processMetrics(mid, tt, in_tt, sid, mv) >>> >>> def process(time, rdd): >>> rdd2 = rdd.map( lambda w: recordHandler(w[1]) ) >>> if rdd2.count() > 0: >>> return rdd2 >>> >>> def casssave(time, rdd): >>> rdd.saveToCassandra( "test_hdpkns", "measurement" ) >>> >>> # ... >>> brokers, topic = sys.argv[1:] >>> >>> # ... >>> >>> sconf = SparkConf() \ >>> .setAppName("SensorDataStreamHandler") \ >>> .setMaster("local[*]") \ >>> .set("spark.default.parallelism", "2") >>> >>> sc = CassandraSparkContext(conf = sconf) >>> batchIntervalSeconds = 2 >>> ssc = StreamingContext(sc, batchIntervalSeconds) >>> >>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], >>> {"metadata.broker.list": brokers}) >>> >>> kafkaStream \ >>> .transform(process) \ >>> .foreachRDD(casssave) >>> >>> ssc.start() >>> ssc.awaitTermination() >>> >>> ================================================ >>> >>> >>> >>> >>>