Did you use RDDs or DataFrames?
What is the Spark version?

On Mon, May 28, 2018 at 10:32 PM, Saulo Sobreiro <saulo.sobre...@outlook.pt>
wrote:

> Hi,
> I run a few more tests and found that even with a lot more operations on
> the scala side, python is outperformed...
>
> Dataset Stream duration: ~3 minutes (csv formatted data messages read from
> Kafka)
> Scala process/store time: ~3 minutes (map with split + metrics
> calculations + store raw + strore metrics )
> Python process/store time: ~7 minutes (map with split + store raw )
>
> This is the difference between being usable in production or not. I get
> that python is likely to be slower because of that Python - Java object
> transformations, but I was not expecting such a huge difference.
>
> This results are very interesting as I was comparing to the time that an
> "equivalent" application in storm takes to process the exact same stream
> (~3 minutes as well) for the same results and spark was clearly losing the
> race.
>
> Thank you all for your feedback :)
>
> Regards,
> Saulo
>
> On 21/05/2018 14:09:40, Russell Spitzer <russell.spit...@gmail.com> wrote:
> The answer is most likely that when you use Cross Java - Python code you
> incur a penalty for every objects that you transform from a Java object
> into a Python object (and then back again to a Python object) when data is
> being passed in and out of your functions. A way around this would probably
> be to have used the Dataframe API if possible, which would have compiled
> the interactions in Java and skipped python-java serialization. Using Scala
> from the start thought is a great idea. I would also probably remove the
> cache from your stream since that probably is only hurting (adding an
> additional serialization which is only used once.)
>
> On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman <alons...@gmail.com>
> wrote:
>
>> The main language they developed spark with is scala, so all the new
>> features go first to scala, java and finally python. I'm not surprised by
>> the results, we've seen it on Stratio since the first versions of spark. At
>> the beginning of development, some of our engineers make the prototype with
>> python, but when it comes down to it, if it goes into production, it has to
>> be rewritten in scala or java, usually scala.
>>
>>
>>
>> El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<
>> saulo.sobre...@outlook.pt>) escribió:
>>
>>> Hi Javier,
>>>
>>> Thank you a lot for the feedback.
>>> Indeed the CPU is a huge limitation. I got a lot of trouble trying to
>>> run this use case in yarn-client mode. I managed to run this in standalone
>>> (local master) mode only.
>>>
>>> I do not have the hardware available to run this setup in a cluster yet,
>>> so I decided to dig a little bit more in the implementation to see what
>>> could I improve. I just finished evaluating some results.
>>> If you find something wrong or odd please let me know.
>>>
>>> Following your suggestion to use "saveToCassandra" directly I decided to
>>> try Scala. Everything was implemented in the most similar way possible and
>>> I got surprised by the results. The scala implementation is much faster.
>>>
>>> My current implementation is slightly different from the Python code
>>> shared some emails ago but to compare the languages influence in the most
>>> comparable way I used the following snippets:
>>>
>>> # Scala implementation ------------------
>>>
>>> val kstream = KafkaUtils.createDirectStream[String, String](
>>>                  ssc,
>>>                  LocationStrategies.PreferConsistent,
>>>                  ConsumerStrategies.Subscribe[String, String](topic,
>>> kafkaParams))
>>> kstream
>>>            .map( x => parse(x.value) )
>>>            .saveToCassandra("hdpkns", "batch_measurement")
>>>
>>> # Python implementation ----------------
>>> # Adapted from the previously shared code. However instead of
>>> calculating the metrics, it is just parsing the messages.
>>>
>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>>>> {"metadata.broker.list": brokers})
>>>>>>
>>>>>
>>>>>> kafkaStream \
>>>>>>     .transform(parse) \
>>>>>>     .foreachRDD(casssave)
>>>>>>
>>>>>>
>>> For the same streaming input the scala app took an average of ~1.5
>>> seconds to handle each event. For the python implementation, the app took
>>> an average of ~80 seconds to handle each event (and after a lot of pickle
>>> concurrency access issues).
>>>
>>> Note that I considered the time as the difference between the event
>>> generation (before being published to Kafka) and the moment just before the
>>> saveToCassandra.
>>>
>>> The problem in the python implementation seems to be due to the delay
>>> introduced by the foreachRDD(casssave) call, which only runs 
>>> rdd.saveToCassandra(
>>> "test_hdpkns", "measurement" ).
>>>
>>>
>>> Honestly I was not expecting such a difference between these 2 codes...
>>> Can you understand why is this happening ?
>>>
>>>
>>>
>>> Again, Thank you very much for your help,
>>>
>>> Best Regards
>>>
>>>
>>> Sharing my current Scala code below
>>> # Scala Snippet =========================
>>> val sparkConf = new SparkConf(). // ...
>>> val ssc = new StreamingContext(sparkConf, Seconds(1))
>>> val sc = ssc.sparkContext
>>> //...
>>> val kstream = KafkaUtils.createDirectStream[String, String](
>>>                  ssc,
>>>                  LocationStrategies.PreferConsistent,
>>>                  ConsumerStrategies.Subscribe[String, String](topic,
>>> kafkaParams))
>>> //...
>>> // handle Kafka messages in a parallel fashion
>>> val ckstream = kstream.map( x => parse(x.value) ).cache()
>>> ckstream
>>>               .foreachRDD( rdd => {
>>>                     rdd.foreach(metrics)
>>>               } )
>>> ckstream
>>>               .saveToCassandra("hdpkns", "microbatch_raw_measurement")
>>> #=========================
>>>
>>> On 30/04/2018 14:57:50, Javier Pareja <pareja.jav...@gmail.com> wrote:
>>> Hi Saulo,
>>>
>>> If the CPU is close to 100% then you are hitting the limit. I don't
>>> think that moving to Scala will make a difference. Both Spark and Cassandra
>>> are CPU hungry, your setup is small in terms of CPUs. Try running Spark on
>>> another (physical) machine so that the 2 cores are dedicated to Cassandra.
>>>
>>> Kind Regards
>>> Javier
>>>
>>>
>>>
>>> On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
>>> wrote:
>>>
>>>> Hi Javier,
>>>>
>>>> I will try to implement this in scala then. As far as I can see in the
>>>> documentation there is no SaveToCassandra in the python interface unless
>>>> you are working with dataframes and the kafkaStream instance does not
>>>> provide methods to convert an RDD into DF.
>>>>
>>>> Regarding my table, it is very simple (see below). Can I change
>>>> something to make it write faster?
>>>> CREATE TABLE test_hdpkns.measurement (
>>>>   mid bigint,
>>>>   tt timestamp,
>>>>   in_tt timestamp,
>>>>   out_tt timestamp,
>>>>   sensor_id int,
>>>>   measure double,
>>>>   PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
>>>> ) with compact storage;
>>>>
>>>> The system CPU while the demo is running is almost always at 100% for
>>>> both cores.
>>>>
>>>>
>>>> Thank you.
>>>>
>>>> Best Regards,
>>>>
>>>> On 29/04/2018 20:46:30, Javier Pareja <pareja.jav...@gmail.com> wrote:
>>>> Hi Saulo,
>>>>
>>>> I meant using this to save:
>>>> https://github.com/datastax/spark-cassandra-connector/
>>>> blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream
>>>>
>>>> But it might be slow on a different area.
>>>> Another point is that Cassandra and spark running on the same machine
>>>> might compete for resources which will slow down the insert. You can check
>>>> the CPU usage of the machine at the time. Also the design of the table
>>>> schema can make a big difference.
>>>>
>>>>
>>>> On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
>>>> wrote:
>>>>
>>>>> Hi Javier,
>>>>>
>>>>>
>>>>> I removed the map and used "map" directly instead of using transform,
>>>>> but the *kafkaStream* is created with KafkaUtils which does not have
>>>>> a method to save to cassandra directly.
>>>>>
>>>>> Do you know any workarround for this?
>>>>>
>>>>>
>>>>> Thank you for the suggestion.
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> On 29/04/2018 17:03:24, Javier Pareja <pareja.jav...@gmail.com> wrote:
>>>>> Hi Saulo,
>>>>>
>>>>> I'm no expert but I will give it a try.
>>>>> I would remove the rdd2.count(), I can't see the point and you will
>>>>> gain performance right away. Because of this, I would not use a transform,
>>>>> just directly the map.
>>>>> I have not used python but in Scala the cassandra-spark connector can
>>>>> save directly to Cassandra without a foreachRDD.
>>>>>
>>>>> Finally I would use the spark UI to find which stage is the bottleneck
>>>>> here.
>>>>>
>>>>> On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I am implementing a use case where I read some sensor data from Kafka
>>>>>> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and,
>>>>>> after some transformations, write the output (RDD) to Cassandra.
>>>>>>
>>>>>> Everything is working properly but I am having some trouble with the
>>>>>> performance. My kafka topic receives around 2000 messages per second. 
>>>>>> For a
>>>>>> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write 
>>>>>> to
>>>>>> Cassandra, which is not acceptable for longer runs.
>>>>>>
>>>>>> I am running this application in a "sandbox" with 12GB of RAM, 2
>>>>>> cores and 30GB SSD space.
>>>>>> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).
>>>>>>
>>>>>> I would like to know you have some suggestion to improve performance
>>>>>> (other than getting more resources :) ).
>>>>>>
>>>>>> My code (pyspark) is posted in the end of this email so you can take
>>>>>> a look. I tried some different cassandra configurations following this
>>>>>> link: http://www.snappydata.io/blog/snappydata-memsql-
>>>>>> cassandra-a-performance-benchmark (recommended in stackoverflow for
>>>>>> similar questions).
>>>>>>
>>>>>>
>>>>>> Thank you in advance,
>>>>>>
>>>>>> Best Regards,
>>>>>> Saulo
>>>>>>
>>>>>>
>>>>>>
>>>>>> =============== # CODE # =================================
>>>>>> ####
>>>>>> # run command:
>>>>>> # spark2-submit --packages org.apache.spark:spark-
>>>>>> streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.
>>>>>> 7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf
>>>>>> spark.cassandra.connection.host='localhost' --num-executors 2
>>>>>> --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
>>>>>> ##
>>>>>>
>>>>>> # Run Spark imports
>>>>>> from pyspark import SparkConf # SparkContext, SparkConf
>>>>>> from pyspark.streaming import StreamingContext
>>>>>> from pyspark.streaming.kafka import KafkaUtils
>>>>>>
>>>>>> # Run Cassandra imports
>>>>>> import pyspark_cassandra
>>>>>> from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>>>>>>
>>>>>> def recordHandler(record):
>>>>>>     (mid, tt, in_tt, sid, mv) = parseData( record )
>>>>>>     return processMetrics(mid, tt, in_tt, sid, mv)
>>>>>>
>>>>>> def process(time, rdd):
>>>>>>     rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
>>>>>>     if rdd2.count() > 0:
>>>>>>         return rdd2
>>>>>>
>>>>>> def casssave(time, rdd):
>>>>>>     rdd.saveToCassandra( "test_hdpkns", "measurement" )
>>>>>>
>>>>>> # ...
>>>>>> brokers, topic = sys.argv[1:]
>>>>>>
>>>>>> # ...
>>>>>>
>>>>>> sconf = SparkConf() \
>>>>>>         .setAppName("SensorDataStreamHandler") \
>>>>>>         .setMaster("local[*]") \
>>>>>>         .set("spark.default.parallelism", "2")
>>>>>>
>>>>>> sc = CassandraSparkContext(conf = sconf)
>>>>>> batchIntervalSeconds = 2
>>>>>> ssc = StreamingContext(sc, batchIntervalSeconds)
>>>>>>
>>>>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>>>> {"metadata.broker.list": brokers})
>>>>>>
>>>>>> kafkaStream \
>>>>>>     .transform(process) \
>>>>>>     .foreachRDD(casssave)
>>>>>>
>>>>>> ssc.start()
>>>>>> ssc.awaitTermination()
>>>>>>
>>>>>> ================================================
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>
>> --
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>
>

Reply via email to