The answer is most likely that when you use Cross Java - Python code you
incur a penalty for every objects that you transform from a Java object
into a Python object (and then back again to a Python object) when data is
being passed in and out of your functions. A way around this would probably
be to have used the Dataframe API if possible, which would have compiled
the interactions in Java and skipped python-java serialization. Using Scala
from the start thought is a great idea. I would also probably remove the
cache from your stream since that probably is only hurting (adding an
additional serialization which is only used once.)

> The main language they developed spark with is scala, so all the new
> features go first to scala, java and finally python. I'm not surprised by
> the results, we've seen it on Stratio since the first versions of spark. At
> the beginning of development, some of our engineers make the prototype with
> python, but when it comes down to it, if it goes into production, it has to
> be rewritten in scala or java, usually scala.
>> Indeed the CPU is a huge limitation. I got a lot of trouble trying to run
>> this use case in yarn-client mode. I managed to run this in standalone
>> (local master) mode only.
>> I do not have the hardware available to run this setup in a cluster yet,
>> so I decided to dig a little bit more in the implementation to see what
>> could I improve. I just finished evaluating some results.
>> If you find something wrong or odd please let me know.
>> Following your suggestion to use "saveToCassandra" directly I decided to
>> try Scala. Everything was implemented in the most similar way possible and
>> I got surprised by the results. The scala implementation is much faster.
>> My current implementation is slightly different from the Python code
>> shared some emails ago but to compare the languages influence in the most
>> comparable way I used the following snippets:
>> # Scala implementation ------------------
>> val kstream = KafkaUtils.createDirectStream[String, String](
>>                  ssc,
>>                  LocationStrategies.PreferConsistent,
>>                  ConsumerStrategies.Subscribe[String, String](topic,
>> kafkaParams))
>> kstream
>>            .map( x => parse(x.value) )
>>            .saveToCassandra("hdpkns", "batch_measurement")
>> # Python implementation ----------------
>> # Adapted from the previously shared code. However instead of
>> calculating the metrics, it is just parsing the messages.
>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>>> {"": brokers})
>>>>> kafkaStream \
>>>>>     .transform(parse) \
>>>>>     .foreachRDD(casssave)
>> For the same streaming input the scala app took an average of ~1.5
>> seconds to handle each event. For the python implementation, the app took
>> an average of ~80 seconds to handle each event (and after a lot of pickle
>> concurrency access issues).
>> Note that I considered the time as the difference between the event
>> generation (before being published to Kafka) and the moment just before the
>> saveToCassandra.
>> The problem in the python implementation seems to be due to the delay
>> introduced by the foreachRDD(casssave) call, which only runs 
>> rdd.saveToCassandra(
>> "test_hdpkns", "measurement" ).
>> Honestly I was not expecting such a difference between these 2 codes...
>> Can you understand why is this happening ?
>> Sharing my current Scala code below
>> # Scala Snippet =========================
>> val sparkConf = new SparkConf(). // ...
>> val ssc = new StreamingContext(sparkConf, Seconds(1))
>> val sc = ssc.sparkContext
>> //...
>> val kstream = KafkaUtils.createDirectStream[String, String](
>>                  ssc,
>>                  LocationStrategies.PreferConsistent,
>>                  ConsumerStrategies.Subscribe[String, String](topic,
>> kafkaParams))
>> //...
>> // handle Kafka messages in a parallel fashion
>> val ckstream = x => parse(x.value) ).cache()
>> ckstream
>>               .foreachRDD( rdd => {
>>                     rdd.foreach(metrics)
>>               } )
>> ckstream
>>               .saveToCassandra("hdpkns", "microbatch_raw_measurement")
>> #=========================
>> Hi Saulo,
>> If the CPU is close to 100% then you are hitting the limit. I don't think
>> that moving to Scala will make a difference. Both Spark and Cassandra are
>> CPU hungry, your setup is small in terms of CPUs. Try running Spark on
>> another (physical) machine so that the 2 cores are dedicated to Cassandra.
>>> Hi Javier,
>>> I will try to implement this in scala then. As far as I can see in the
>>> documentation there is no SaveToCassandra in the python interface unless
>>> you are working with dataframes and the kafkaStream instance does not
>>> provide methods to convert an RDD into DF.
>>> Regarding my table, it is very simple (see below). Can I change
>>> something to make it write faster?
>>> CREATE TABLE test_hdpkns.measurement (
>>>   mid bigint,
>>>   tt timestamp,
>>>   in_tt timestamp,
>>>   out_tt timestamp,
>>>   sensor_id int,
>>>   measure double,
>>>   PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
>>> ) with compact storage;
>>> The system CPU while the demo is running is almost always at 100% for
>>> both cores.
>>> Hi Saulo,
>>> I meant using this to save:
>>> But it might be slow on a different area.
>>> Another point is that Cassandra and spark running on the same machine
>>> might compete for resources which will slow down the insert. You can check
>>> the CPU usage of the machine at the time. Also the design of the table
>>> schema can make a big difference.
>>>> Hi Javier,
>>>> I removed the map and used "map" directly instead of using transform,
>>>> but the *kafkaStream* is created with KafkaUtils which does not have a
>>>> method to save to cassandra directly.
>>>> Do you know any workarround for this?
>>>> Hi Saulo,
>>>> I'm no expert but I will give it a try.
>>>> I would remove the rdd2.count(), I can't see the point and you will
>>>> gain performance right away. Because of this, I would not use a transform,
>>>> just directly the map.
>>>> I have not used python but in Scala the cassandra-spark connector can
>>>> save directly to Cassandra without a foreachRDD.
>>>> Finally I would use the spark UI to find which stage is the bottleneck
>>>> here.
>>>>> Hi all,
>>>>> I am implementing a use case where I read some sensor data from Kafka
>>>>> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and,
>>>>> after some transformations, write the output (RDD) to Cassandra.
>>>>> Everything is working properly but I am having some trouble with the
>>>>> performance. My kafka topic receives around 2000 messages per second. For 
>>>>> a
>>>>> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to
>>>>> Cassandra, which is not acceptable for longer runs.
>>>>> I am running this application in a "sandbox" with 12GB of RAM, 2 cores
>>>>> and 30GB SSD space.
>>>>> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).
>>>>> I would like to know you have some suggestion to improve performance
>>>>> (other than getting more resources :) ).
>>>>> My code (pyspark) is posted in the end of this email so you can take a
>>>>> look. I tried some different cassandra configurations following this link:
>>>>> (recommended in stackoverflow for similar questions).
>>>>> =============== # CODE # =================================
>>>>> ####
>>>>> # run command:
>>>>> # spark2-submit --packages
>>>>> org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
>>>>>  --conf'localhost' --num-executors 2
>>>>> --executor-cores 2 localhost:6667 test_topic2
>>>>> ##
>>>>> # Run Spark imports
>>>>> from pyspark import SparkConf # SparkContext, SparkConf
>>>>> from pyspark.streaming import StreamingContext
>>>>> from pyspark.streaming.kafka import KafkaUtils
>>>>> # Run Cassandra imports
>>>>> import pyspark_cassandra
>>>>> from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>>>>> def recordHandler(record):
>>>>>     (mid, tt, in_tt, sid, mv) = parseData( record )
>>>>>     return processMetrics(mid, tt, in_tt, sid, mv)
>>>>> def process(time, rdd):
>>>>>     rdd2 = lambda w: recordHandler(w[1]) )
>>>>>     if rdd2.count() > 0:
>>>>>         return rdd2
>>>>> def casssave(time, rdd):
>>>>>     rdd.saveToCassandra( "test_hdpkns", "measurement" )
>>>>> # ...
>>>>> brokers, topic = sys.argv[1:]
>>>>> # ...
>>>>> sconf = SparkConf() \
>>>>>         .setAppName("SensorDataStreamHandler") \
>>>>>         .setMaster("local[*]") \
>>>>>         .set("spark.default.parallelism", "2")
>>>>> sc = CassandraSparkContext(conf = sconf)
>>>>> batchIntervalSeconds = 2
>>>>> ssc = StreamingContext(sc, batchIntervalSeconds)
>>>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>>> {"": brokers})
>>>>> kafkaStream \
>>>>>     .transform(process) \
>>>>>     .foreachRDD(casssave)
>>>>> ssc.start()
>>>>> ssc.awaitTermination()
>>>>> ================================================
