The answer is most likely that when you use Cross Java - Python code you
incur a penalty for every objects that you transform from a Java object
into a Python object (and then back again to a Python object) when data is
being passed in and out of your functions. A way around this would probably
be to have used the Dataframe API if possible, which would have compiled
the interactions in Java and skipped python-java serialization. Using Scala
from the start thought is a great idea. I would also probably remove the
cache from your stream since that probably is only hurting (adding an
additional serialization which is only used once.)

On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman <alons...@gmail.com>
wrote:

> The main language they developed spark with is scala, so all the new
> features go first to scala, java and finally python. I'm not surprised by
> the results, we've seen it on Stratio since the first versions of spark. At
> the beginning of development, some of our engineers make the prototype with
> python, but when it comes down to it, if it goes into production, it has to
> be rewritten in scala or java, usually scala.
>
>
>
> El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<
> saulo.sobre...@outlook.pt>) escribió:
>
>> Hi Javier,
>>
>> Thank you a lot for the feedback.
>> Indeed the CPU is a huge limitation. I got a lot of trouble trying to run
>> this use case in yarn-client mode. I managed to run this in standalone
>> (local master) mode only.
>>
>> I do not have the hardware available to run this setup in a cluster yet,
>> so I decided to dig a little bit more in the implementation to see what
>> could I improve. I just finished evaluating some results.
>> If you find something wrong or odd please let me know.
>>
>> Following your suggestion to use "saveToCassandra" directly I decided to
>> try Scala. Everything was implemented in the most similar way possible and
>> I got surprised by the results. The scala implementation is much faster.
>>
>> My current implementation is slightly different from the Python code
>> shared some emails ago but to compare the languages influence in the most
>> comparable way I used the following snippets:
>>
>> # Scala implementation ------------------
>>
>> val kstream = KafkaUtils.createDirectStream[String, String](
>>                  ssc,
>>                  LocationStrategies.PreferConsistent,
>>                  ConsumerStrategies.Subscribe[String, String](topic,
>> kafkaParams))
>> kstream
>>            .map( x => parse(x.value) )
>>            .saveToCassandra("hdpkns", "batch_measurement")
>>
>> # Python implementation ----------------
>> # Adapted from the previously shared code. However instead of
>> calculating the metrics, it is just parsing the messages.
>>
>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>>> {"metadata.broker.list": brokers})
>>>>>
>>>>
>>>>> kafkaStream \
>>>>>     .transform(parse) \
>>>>>     .foreachRDD(casssave)
>>>>>
>>>>>
>> For the same streaming input the scala app took an average of ~1.5
>> seconds to handle each event. For the python implementation, the app took
>> an average of ~80 seconds to handle each event (and after a lot of pickle
>> concurrency access issues).
>>
>> Note that I considered the time as the difference between the event
>> generation (before being published to Kafka) and the moment just before the
>> saveToCassandra.
>>
>> The problem in the python implementation seems to be due to the delay
>> introduced by the foreachRDD(casssave) call, which only runs 
>> rdd.saveToCassandra(
>> "test_hdpkns", "measurement" ).
>>
>>
>> Honestly I was not expecting such a difference between these 2 codes...
>> Can you understand why is this happening ?
>>
>>
>>
>> Again, Thank you very much for your help,
>>
>> Best Regards
>>
>>
>> Sharing my current Scala code below
>> # Scala Snippet =========================
>> val sparkConf = new SparkConf(). // ...
>> val ssc = new StreamingContext(sparkConf, Seconds(1))
>> val sc = ssc.sparkContext
>> //...
>> val kstream = KafkaUtils.createDirectStream[String, String](
>>                  ssc,
>>                  LocationStrategies.PreferConsistent,
>>                  ConsumerStrategies.Subscribe[String, String](topic,
>> kafkaParams))
>> //...
>> // handle Kafka messages in a parallel fashion
>> val ckstream = kstream.map( x => parse(x.value) ).cache()
>> ckstream
>>               .foreachRDD( rdd => {
>>                     rdd.foreach(metrics)
>>               } )
>> ckstream
>>               .saveToCassandra("hdpkns", "microbatch_raw_measurement")
>> #=========================
>>
>> On 30/04/2018 14:57:50, Javier Pareja <pareja.jav...@gmail.com> wrote:
>> Hi Saulo,
>>
>> If the CPU is close to 100% then you are hitting the limit. I don't think
>> that moving to Scala will make a difference. Both Spark and Cassandra are
>> CPU hungry, your setup is small in terms of CPUs. Try running Spark on
>> another (physical) machine so that the 2 cores are dedicated to Cassandra.
>>
>> Kind Regards
>> Javier
>>
>>
>>
>> On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
>> wrote:
>>
>>> Hi Javier,
>>>
>>> I will try to implement this in scala then. As far as I can see in the
>>> documentation there is no SaveToCassandra in the python interface unless
>>> you are working with dataframes and the kafkaStream instance does not
>>> provide methods to convert an RDD into DF.
>>>
>>> Regarding my table, it is very simple (see below). Can I change
>>> something to make it write faster?
>>> CREATE TABLE test_hdpkns.measurement (
>>>   mid bigint,
>>>   tt timestamp,
>>>   in_tt timestamp,
>>>   out_tt timestamp,
>>>   sensor_id int,
>>>   measure double,
>>>   PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
>>> ) with compact storage;
>>>
>>> The system CPU while the demo is running is almost always at 100% for
>>> both cores.
>>>
>>>
>>> Thank you.
>>>
>>> Best Regards,
>>>
>>> On 29/04/2018 20:46:30, Javier Pareja <pareja.jav...@gmail.com> wrote:
>>> Hi Saulo,
>>>
>>> I meant using this to save:
>>>
>>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream
>>>
>>> But it might be slow on a different area.
>>> Another point is that Cassandra and spark running on the same machine
>>> might compete for resources which will slow down the insert. You can check
>>> the CPU usage of the machine at the time. Also the design of the table
>>> schema can make a big difference.
>>>
>>>
>>> On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
>>> wrote:
>>>
>>>> Hi Javier,
>>>>
>>>>
>>>> I removed the map and used "map" directly instead of using transform,
>>>> but the *kafkaStream* is created with KafkaUtils which does not have a
>>>> method to save to cassandra directly.
>>>>
>>>> Do you know any workarround for this?
>>>>
>>>>
>>>> Thank you for the suggestion.
>>>>
>>>> Best Regards,
>>>>
>>>> On 29/04/2018 17:03:24, Javier Pareja <pareja.jav...@gmail.com> wrote:
>>>> Hi Saulo,
>>>>
>>>> I'm no expert but I will give it a try.
>>>> I would remove the rdd2.count(), I can't see the point and you will
>>>> gain performance right away. Because of this, I would not use a transform,
>>>> just directly the map.
>>>> I have not used python but in Scala the cassandra-spark connector can
>>>> save directly to Cassandra without a foreachRDD.
>>>>
>>>> Finally I would use the spark UI to find which stage is the bottleneck
>>>> here.
>>>>
>>>> On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I am implementing a use case where I read some sensor data from Kafka
>>>>> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and,
>>>>> after some transformations, write the output (RDD) to Cassandra.
>>>>>
>>>>> Everything is working properly but I am having some trouble with the
>>>>> performance. My kafka topic receives around 2000 messages per second. For 
>>>>> a
>>>>> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to
>>>>> Cassandra, which is not acceptable for longer runs.
>>>>>
>>>>> I am running this application in a "sandbox" with 12GB of RAM, 2 cores
>>>>> and 30GB SSD space.
>>>>> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).
>>>>>
>>>>> I would like to know you have some suggestion to improve performance
>>>>> (other than getting more resources :) ).
>>>>>
>>>>> My code (pyspark) is posted in the end of this email so you can take a
>>>>> look. I tried some different cassandra configurations following this link:
>>>>> http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark
>>>>> (recommended in stackoverflow for similar questions).
>>>>>
>>>>>
>>>>> Thank you in advance,
>>>>>
>>>>> Best Regards,
>>>>> Saulo
>>>>>
>>>>>
>>>>>
>>>>> =============== # CODE # =================================
>>>>> ####
>>>>> # run command:
>>>>> # spark2-submit --packages
>>>>> org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
>>>>>  --conf spark.cassandra.connection.host='localhost' --num-executors 2
>>>>> --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
>>>>> ##
>>>>>
>>>>> # Run Spark imports
>>>>> from pyspark import SparkConf # SparkContext, SparkConf
>>>>> from pyspark.streaming import StreamingContext
>>>>> from pyspark.streaming.kafka import KafkaUtils
>>>>>
>>>>> # Run Cassandra imports
>>>>> import pyspark_cassandra
>>>>> from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>>>>>
>>>>> def recordHandler(record):
>>>>>     (mid, tt, in_tt, sid, mv) = parseData( record )
>>>>>     return processMetrics(mid, tt, in_tt, sid, mv)
>>>>>
>>>>> def process(time, rdd):
>>>>>     rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
>>>>>     if rdd2.count() > 0:
>>>>>         return rdd2
>>>>>
>>>>> def casssave(time, rdd):
>>>>>     rdd.saveToCassandra( "test_hdpkns", "measurement" )
>>>>>
>>>>> # ...
>>>>> brokers, topic = sys.argv[1:]
>>>>>
>>>>> # ...
>>>>>
>>>>> sconf = SparkConf() \
>>>>>         .setAppName("SensorDataStreamHandler") \
>>>>>         .setMaster("local[*]") \
>>>>>         .set("spark.default.parallelism", "2")
>>>>>
>>>>> sc = CassandraSparkContext(conf = sconf)
>>>>> batchIntervalSeconds = 2
>>>>> ssc = StreamingContext(sc, batchIntervalSeconds)
>>>>>
>>>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>>> {"metadata.broker.list": brokers})
>>>>>
>>>>> kafkaStream \
>>>>>     .transform(process) \
>>>>>     .foreachRDD(casssave)
>>>>>
>>>>> ssc.start()
>>>>> ssc.awaitTermination()
>>>>>
>>>>> ================================================
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>
> --
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>

Reply via email to