Hi Saulo,

If the CPU is close to 100% then you are hitting the limit. I don't think
that moving to Scala will make a difference. Both Spark and Cassandra are
CPU hungry, your setup is small in terms of CPUs. Try running Spark on
another (physical) machine so that the 2 cores are dedicated to Cassandra.

Kind Regards
Javier



On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
wrote:

> Hi Javier,
>
> I will try to implement this in scala then. As far as I can see in the
> documentation there is no SaveToCassandra in the python interface unless
> you are working with dataframes and the kafkaStream instance does not
> provide methods to convert an RDD into DF.
>
> Regarding my table, it is very simple (see below). Can I change something
> to make it write faster?
> CREATE TABLE test_hdpkns.measurement (
>   mid bigint,
>   tt timestamp,
>   in_tt timestamp,
>   out_tt timestamp,
>   sensor_id int,
>   measure double,
>   PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
> ) with compact storage;
>
> The system CPU while the demo is running is almost always at 100% for both
> cores.
>
>
> Thank you.
>
> Best Regards,
>
> On 29/04/2018 20:46:30, Javier Pareja <pareja.jav...@gmail.com> wrote:
> Hi Saulo,
>
> I meant using this to save:
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream
>
> But it might be slow on a different area.
> Another point is that Cassandra and spark running on the same machine
> might compete for resources which will slow down the insert. You can check
> the CPU usage of the machine at the time. Also the design of the table
> schema can make a big difference.
>
>
> On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
> wrote:
>
>> Hi Javier,
>>
>>
>> I removed the map and used "map" directly instead of using transform, but
>> the *kafkaStream* is created with KafkaUtils which does not have a
>> method to save to cassandra directly.
>>
>> Do you know any workarround for this?
>>
>>
>> Thank you for the suggestion.
>>
>> Best Regards,
>>
>> On 29/04/2018 17:03:24, Javier Pareja <pareja.jav...@gmail.com> wrote:
>> Hi Saulo,
>>
>> I'm no expert but I will give it a try.
>> I would remove the rdd2.count(), I can't see the point and you will gain
>> performance right away. Because of this, I would not use a transform, just
>> directly the map.
>> I have not used python but in Scala the cassandra-spark connector can
>> save directly to Cassandra without a foreachRDD.
>>
>> Finally I would use the spark UI to find which stage is the bottleneck
>> here.
>>
>> On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
>> wrote:
>>
>>> Hi all,
>>>
>>> I am implementing a use case where I read some sensor data from Kafka
>>> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and,
>>> after some transformations, write the output (RDD) to Cassandra.
>>>
>>> Everything is working properly but I am having some trouble with the
>>> performance. My kafka topic receives around 2000 messages per second. For a
>>> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to
>>> Cassandra, which is not acceptable for longer runs.
>>>
>>> I am running this application in a "sandbox" with 12GB of RAM, 2 cores
>>> and 30GB SSD space.
>>> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).
>>>
>>> I would like to know you have some suggestion to improve performance
>>> (other than getting more resources :) ).
>>>
>>> My code (pyspark) is posted in the end of this email so you can take a
>>> look. I tried some different cassandra configurations following this link:
>>> http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark
>>> (recommended in stackoverflow for similar questions).
>>>
>>>
>>> Thank you in advance,
>>>
>>> Best Regards,
>>> Saulo
>>>
>>>
>>>
>>> =============== # CODE # =================================
>>> ####
>>> # run command:
>>> # spark2-submit --packages
>>> org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
>>>  --conf spark.cassandra.connection.host='localhost' --num-executors 2
>>> --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
>>> ##
>>>
>>> # Run Spark imports
>>> from pyspark import SparkConf # SparkContext, SparkConf
>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.kafka import KafkaUtils
>>>
>>> # Run Cassandra imports
>>> import pyspark_cassandra
>>> from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>>>
>>> def recordHandler(record):
>>>     (mid, tt, in_tt, sid, mv) = parseData( record )
>>>     return processMetrics(mid, tt, in_tt, sid, mv)
>>>
>>> def process(time, rdd):
>>>     rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
>>>     if rdd2.count() > 0:
>>>         return rdd2
>>>
>>> def casssave(time, rdd):
>>>     rdd.saveToCassandra( "test_hdpkns", "measurement" )
>>>
>>> # ...
>>> brokers, topic = sys.argv[1:]
>>>
>>> # ...
>>>
>>> sconf = SparkConf() \
>>>         .setAppName("SensorDataStreamHandler") \
>>>         .setMaster("local[*]") \
>>>         .set("spark.default.parallelism", "2")
>>>
>>> sc = CassandraSparkContext(conf = sconf)
>>> batchIntervalSeconds = 2
>>> ssc = StreamingContext(sc, batchIntervalSeconds)
>>>
>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>> {"metadata.broker.list": brokers})
>>>
>>> kafkaStream \
>>>     .transform(process) \
>>>     .foreachRDD(casssave)
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>> ================================================
>>>
>>>
>>>
>>>
>>>

Reply via email to