Did you use RDDs or DataFrames? What is the Spark version? On Mon, May 28, 2018 at 10:32 PM, Saulo Sobreiro <saulo.sobre...@outlook.pt> wrote:
> Hi, > I run a few more tests and found that even with a lot more operations on > the scala side, python is outperformed... > > Dataset Stream duration: ~3 minutes (csv formatted data messages read from > Kafka) > Scala process/store time: ~3 minutes (map with split + metrics > calculations + store raw + strore metrics ) > Python process/store time: ~7 minutes (map with split + store raw ) > > This is the difference between being usable in production or not. I get > that python is likely to be slower because of that Python - Java object > transformations, but I was not expecting such a huge difference. > > This results are very interesting as I was comparing to the time that an > "equivalent" application in storm takes to process the exact same stream > (~3 minutes as well) for the same results and spark was clearly losing the > race. > > Thank you all for your feedback :) > > Regards, > Saulo > > On 21/05/2018 14:09:40, Russell Spitzer <russell.spit...@gmail.com> wrote: > The answer is most likely that when you use Cross Java - Python code you > incur a penalty for every objects that you transform from a Java object > into a Python object (and then back again to a Python object) when data is > being passed in and out of your functions. A way around this would probably > be to have used the Dataframe API if possible, which would have compiled > the interactions in Java and skipped python-java serialization. Using Scala > from the start thought is a great idea. I would also probably remove the > cache from your stream since that probably is only hurting (adding an > additional serialization which is only used once.) > > On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman <alons...@gmail.com> > wrote: > >> The main language they developed spark with is scala, so all the new >> features go first to scala, java and finally python. I'm not surprised by >> the results, we've seen it on Stratio since the first versions of spark. At >> the beginning of development, some of our engineers make the prototype with >> python, but when it comes down to it, if it goes into production, it has to >> be rewritten in scala or java, usually scala. >> >> >> >> El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (< >> saulo.sobre...@outlook.pt>) escribió: >> >>> Hi Javier, >>> >>> Thank you a lot for the feedback. >>> Indeed the CPU is a huge limitation. I got a lot of trouble trying to >>> run this use case in yarn-client mode. I managed to run this in standalone >>> (local master) mode only. >>> >>> I do not have the hardware available to run this setup in a cluster yet, >>> so I decided to dig a little bit more in the implementation to see what >>> could I improve. I just finished evaluating some results. >>> If you find something wrong or odd please let me know. >>> >>> Following your suggestion to use "saveToCassandra" directly I decided to >>> try Scala. Everything was implemented in the most similar way possible and >>> I got surprised by the results. The scala implementation is much faster. >>> >>> My current implementation is slightly different from the Python code >>> shared some emails ago but to compare the languages influence in the most >>> comparable way I used the following snippets: >>> >>> # Scala implementation ------------------ >>> >>> val kstream = KafkaUtils.createDirectStream[String, String]( >>> ssc, >>> LocationStrategies.PreferConsistent, >>> ConsumerStrategies.Subscribe[String, String](topic, >>> kafkaParams)) >>> kstream >>> .map( x => parse(x.value) ) >>> .saveToCassandra("hdpkns", "batch_measurement") >>> >>> # Python implementation ---------------- >>> # Adapted from the previously shared code. However instead of >>> calculating the metrics, it is just parsing the messages. >>> >>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], >>>>>> {"metadata.broker.list": brokers}) >>>>>> >>>>> >>>>>> kafkaStream \ >>>>>> .transform(parse) \ >>>>>> .foreachRDD(casssave) >>>>>> >>>>>> >>> For the same streaming input the scala app took an average of ~1.5 >>> seconds to handle each event. For the python implementation, the app took >>> an average of ~80 seconds to handle each event (and after a lot of pickle >>> concurrency access issues). >>> >>> Note that I considered the time as the difference between the event >>> generation (before being published to Kafka) and the moment just before the >>> saveToCassandra. >>> >>> The problem in the python implementation seems to be due to the delay >>> introduced by the foreachRDD(casssave) call, which only runs >>> rdd.saveToCassandra( >>> "test_hdpkns", "measurement" ). >>> >>> >>> Honestly I was not expecting such a difference between these 2 codes... >>> Can you understand why is this happening ? >>> >>> >>> >>> Again, Thank you very much for your help, >>> >>> Best Regards >>> >>> >>> Sharing my current Scala code below >>> # Scala Snippet ========================= >>> val sparkConf = new SparkConf(). // ... >>> val ssc = new StreamingContext(sparkConf, Seconds(1)) >>> val sc = ssc.sparkContext >>> //... >>> val kstream = KafkaUtils.createDirectStream[String, String]( >>> ssc, >>> LocationStrategies.PreferConsistent, >>> ConsumerStrategies.Subscribe[String, String](topic, >>> kafkaParams)) >>> //... >>> // handle Kafka messages in a parallel fashion >>> val ckstream = kstream.map( x => parse(x.value) ).cache() >>> ckstream >>> .foreachRDD( rdd => { >>> rdd.foreach(metrics) >>> } ) >>> ckstream >>> .saveToCassandra("hdpkns", "microbatch_raw_measurement") >>> #========================= >>> >>> On 30/04/2018 14:57:50, Javier Pareja <pareja.jav...@gmail.com> wrote: >>> Hi Saulo, >>> >>> If the CPU is close to 100% then you are hitting the limit. I don't >>> think that moving to Scala will make a difference. Both Spark and Cassandra >>> are CPU hungry, your setup is small in terms of CPUs. Try running Spark on >>> another (physical) machine so that the 2 cores are dedicated to Cassandra. >>> >>> Kind Regards >>> Javier >>> >>> >>> >>> On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <saulo.sobre...@outlook.pt> >>> wrote: >>> >>>> Hi Javier, >>>> >>>> I will try to implement this in scala then. As far as I can see in the >>>> documentation there is no SaveToCassandra in the python interface unless >>>> you are working with dataframes and the kafkaStream instance does not >>>> provide methods to convert an RDD into DF. >>>> >>>> Regarding my table, it is very simple (see below). Can I change >>>> something to make it write faster? >>>> CREATE TABLE test_hdpkns.measurement ( >>>> mid bigint, >>>> tt timestamp, >>>> in_tt timestamp, >>>> out_tt timestamp, >>>> sensor_id int, >>>> measure double, >>>> PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt) >>>> ) with compact storage; >>>> >>>> The system CPU while the demo is running is almost always at 100% for >>>> both cores. >>>> >>>> >>>> Thank you. >>>> >>>> Best Regards, >>>> >>>> On 29/04/2018 20:46:30, Javier Pareja <pareja.jav...@gmail.com> wrote: >>>> Hi Saulo, >>>> >>>> I meant using this to save: >>>> https://github.com/datastax/spark-cassandra-connector/ >>>> blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream >>>> >>>> But it might be slow on a different area. >>>> Another point is that Cassandra and spark running on the same machine >>>> might compete for resources which will slow down the insert. You can check >>>> the CPU usage of the machine at the time. Also the design of the table >>>> schema can make a big difference. >>>> >>>> >>>> On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <saulo.sobre...@outlook.pt> >>>> wrote: >>>> >>>>> Hi Javier, >>>>> >>>>> >>>>> I removed the map and used "map" directly instead of using transform, >>>>> but the *kafkaStream* is created with KafkaUtils which does not have >>>>> a method to save to cassandra directly. >>>>> >>>>> Do you know any workarround for this? >>>>> >>>>> >>>>> Thank you for the suggestion. >>>>> >>>>> Best Regards, >>>>> >>>>> On 29/04/2018 17:03:24, Javier Pareja <pareja.jav...@gmail.com> wrote: >>>>> Hi Saulo, >>>>> >>>>> I'm no expert but I will give it a try. >>>>> I would remove the rdd2.count(), I can't see the point and you will >>>>> gain performance right away. Because of this, I would not use a transform, >>>>> just directly the map. >>>>> I have not used python but in Scala the cassandra-spark connector can >>>>> save directly to Cassandra without a foreachRDD. >>>>> >>>>> Finally I would use the spark UI to find which stage is the bottleneck >>>>> here. >>>>> >>>>> On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <saulo.sobre...@outlook.pt> >>>>> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I am implementing a use case where I read some sensor data from Kafka >>>>>> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and, >>>>>> after some transformations, write the output (RDD) to Cassandra. >>>>>> >>>>>> Everything is working properly but I am having some trouble with the >>>>>> performance. My kafka topic receives around 2000 messages per second. >>>>>> For a >>>>>> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write >>>>>> to >>>>>> Cassandra, which is not acceptable for longer runs. >>>>>> >>>>>> I am running this application in a "sandbox" with 12GB of RAM, 2 >>>>>> cores and 30GB SSD space. >>>>>> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1). >>>>>> >>>>>> I would like to know you have some suggestion to improve performance >>>>>> (other than getting more resources :) ). >>>>>> >>>>>> My code (pyspark) is posted in the end of this email so you can take >>>>>> a look. I tried some different cassandra configurations following this >>>>>> link: http://www.snappydata.io/blog/snappydata-memsql- >>>>>> cassandra-a-performance-benchmark (recommended in stackoverflow for >>>>>> similar questions). >>>>>> >>>>>> >>>>>> Thank you in advance, >>>>>> >>>>>> Best Regards, >>>>>> Saulo >>>>>> >>>>>> >>>>>> >>>>>> =============== # CODE # ================================= >>>>>> #### >>>>>> # run command: >>>>>> # spark2-submit --packages org.apache.spark:spark- >>>>>> streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0. >>>>>> 7.0,org.apache.spark:spark-core_2.11:1.5.2 --conf >>>>>> spark.cassandra.connection.host='localhost' --num-executors 2 >>>>>> --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2 >>>>>> ## >>>>>> >>>>>> # Run Spark imports >>>>>> from pyspark import SparkConf # SparkContext, SparkConf >>>>>> from pyspark.streaming import StreamingContext >>>>>> from pyspark.streaming.kafka import KafkaUtils >>>>>> >>>>>> # Run Cassandra imports >>>>>> import pyspark_cassandra >>>>>> from pyspark_cassandra import CassandraSparkContext, saveToCassandra >>>>>> >>>>>> def recordHandler(record): >>>>>> (mid, tt, in_tt, sid, mv) = parseData( record ) >>>>>> return processMetrics(mid, tt, in_tt, sid, mv) >>>>>> >>>>>> def process(time, rdd): >>>>>> rdd2 = rdd.map( lambda w: recordHandler(w[1]) ) >>>>>> if rdd2.count() > 0: >>>>>> return rdd2 >>>>>> >>>>>> def casssave(time, rdd): >>>>>> rdd.saveToCassandra( "test_hdpkns", "measurement" ) >>>>>> >>>>>> # ... >>>>>> brokers, topic = sys.argv[1:] >>>>>> >>>>>> # ... >>>>>> >>>>>> sconf = SparkConf() \ >>>>>> .setAppName("SensorDataStreamHandler") \ >>>>>> .setMaster("local[*]") \ >>>>>> .set("spark.default.parallelism", "2") >>>>>> >>>>>> sc = CassandraSparkContext(conf = sconf) >>>>>> batchIntervalSeconds = 2 >>>>>> ssc = StreamingContext(sc, batchIntervalSeconds) >>>>>> >>>>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], >>>>>> {"metadata.broker.list": brokers}) >>>>>> >>>>>> kafkaStream \ >>>>>> .transform(process) \ >>>>>> .foreachRDD(casssave) >>>>>> >>>>>> ssc.start() >>>>>> ssc.awaitTermination() >>>>>> >>>>>> ================================================ >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >> >> -- >> Alonso Isidoro Roman >> [image: https://]about.me/alonso.isidoro.roman >> >> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >> >