The answer is most likely that when you use Cross Java - Python code you incur a penalty for every objects that you transform from a Java object into a Python object (and then back again to a Python object) when data is being passed in and out of your functions. A way around this would probably be to have used the Dataframe API if possible, which would have compiled the interactions in Java and skipped python-java serialization. Using Scala from the start thought is a great idea. I would also probably remove the cache from your stream since that probably is only hurting (adding an additional serialization which is only used once.)
On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman <alons...@gmail.com> wrote: > The main language they developed spark with is scala, so all the new > features go first to scala, java and finally python. I'm not surprised by > the results, we've seen it on Stratio since the first versions of spark. At > the beginning of development, some of our engineers make the prototype with > python, but when it comes down to it, if it goes into production, it has to > be rewritten in scala or java, usually scala. > > > > El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (< > saulo.sobre...@outlook.pt>) escribió: > >> Hi Javier, >> >> Thank you a lot for the feedback. >> Indeed the CPU is a huge limitation. I got a lot of trouble trying to run >> this use case in yarn-client mode. I managed to run this in standalone >> (local master) mode only. >> >> I do not have the hardware available to run this setup in a cluster yet, >> so I decided to dig a little bit more in the implementation to see what >> could I improve. I just finished evaluating some results. >> If you find something wrong or odd please let me know. >> >> Following your suggestion to use "saveToCassandra" directly I decided to >> try Scala. Everything was implemented in the most similar way possible and >> I got surprised by the results. The scala implementation is much faster. >> >> My current implementation is slightly different from the Python code >> shared some emails ago but to compare the languages influence in the most >> comparable way I used the following snippets: >> >> # Scala implementation ------------------ >> >> val kstream = KafkaUtils.createDirectStream[String, String]( >> ssc, >> LocationStrategies.PreferConsistent, >> ConsumerStrategies.Subscribe[String, String](topic, >> kafkaParams)) >> kstream >> .map( x => parse(x.value) ) >> .saveToCassandra("hdpkns", "batch_measurement") >> >> # Python implementation ---------------- >> # Adapted from the previously shared code. However instead of >> calculating the metrics, it is just parsing the messages. >> >> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], >>>>> {"metadata.broker.list": brokers}) >>>>> >>>> >>>>> kafkaStream \ >>>>> .transform(parse) \ >>>>> .foreachRDD(casssave) >>>>> >>>>> >> For the same streaming input the scala app took an average of ~1.5 >> seconds to handle each event. For the python implementation, the app took >> an average of ~80 seconds to handle each event (and after a lot of pickle >> concurrency access issues). >> >> Note that I considered the time as the difference between the event >> generation (before being published to Kafka) and the moment just before the >> saveToCassandra. >> >> The problem in the python implementation seems to be due to the delay >> introduced by the foreachRDD(casssave) call, which only runs >> rdd.saveToCassandra( >> "test_hdpkns", "measurement" ). >> >> >> Honestly I was not expecting such a difference between these 2 codes... >> Can you understand why is this happening ? >> >> >> >> Again, Thank you very much for your help, >> >> Best Regards >> >> >> Sharing my current Scala code below >> # Scala Snippet ========================= >> val sparkConf = new SparkConf(). // ... >> val ssc = new StreamingContext(sparkConf, Seconds(1)) >> val sc = ssc.sparkContext >> //... >> val kstream = KafkaUtils.createDirectStream[String, String]( >> ssc, >> LocationStrategies.PreferConsistent, >> ConsumerStrategies.Subscribe[String, String](topic, >> kafkaParams)) >> //... >> // handle Kafka messages in a parallel fashion >> val ckstream = kstream.map( x => parse(x.value) ).cache() >> ckstream >> .foreachRDD( rdd => { >> rdd.foreach(metrics) >> } ) >> ckstream >> .saveToCassandra("hdpkns", "microbatch_raw_measurement") >> #========================= >> >> On 30/04/2018 14:57:50, Javier Pareja <pareja.jav...@gmail.com> wrote: >> Hi Saulo, >> >> If the CPU is close to 100% then you are hitting the limit. I don't think >> that moving to Scala will make a difference. Both Spark and Cassandra are >> CPU hungry, your setup is small in terms of CPUs. Try running Spark on >> another (physical) machine so that the 2 cores are dedicated to Cassandra. >> >> Kind Regards >> Javier >> >> >> >> On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <saulo.sobre...@outlook.pt> >> wrote: >> >>> Hi Javier, >>> >>> I will try to implement this in scala then. As far as I can see in the >>> documentation there is no SaveToCassandra in the python interface unless >>> you are working with dataframes and the kafkaStream instance does not >>> provide methods to convert an RDD into DF. >>> >>> Regarding my table, it is very simple (see below). Can I change >>> something to make it write faster? >>> CREATE TABLE test_hdpkns.measurement ( >>> mid bigint, >>> tt timestamp, >>> in_tt timestamp, >>> out_tt timestamp, >>> sensor_id int, >>> measure double, >>> PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt) >>> ) with compact storage; >>> >>> The system CPU while the demo is running is almost always at 100% for >>> both cores. >>> >>> >>> Thank you. >>> >>> Best Regards, >>> >>> On 29/04/2018 20:46:30, Javier Pareja <pareja.jav...@gmail.com> wrote: >>> Hi Saulo, >>> >>> I meant using this to save: >>> >>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream >>> >>> But it might be slow on a different area. >>> Another point is that Cassandra and spark running on the same machine >>> might compete for resources which will slow down the insert. You can check >>> the CPU usage of the machine at the time. Also the design of the table >>> schema can make a big difference. >>> >>> >>> On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <saulo.sobre...@outlook.pt> >>> wrote: >>> >>>> Hi Javier, >>>> >>>> >>>> I removed the map and used "map" directly instead of using transform, >>>> but the *kafkaStream* is created with KafkaUtils which does not have a >>>> method to save to cassandra directly. >>>> >>>> Do you know any workarround for this? >>>> >>>> >>>> Thank you for the suggestion. >>>> >>>> Best Regards, >>>> >>>> On 29/04/2018 17:03:24, Javier Pareja <pareja.jav...@gmail.com> wrote: >>>> Hi Saulo, >>>> >>>> I'm no expert but I will give it a try. >>>> I would remove the rdd2.count(), I can't see the point and you will >>>> gain performance right away. Because of this, I would not use a transform, >>>> just directly the map. >>>> I have not used python but in Scala the cassandra-spark connector can >>>> save directly to Cassandra without a foreachRDD. >>>> >>>> Finally I would use the spark UI to find which stage is the bottleneck >>>> here. >>>> >>>> On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <saulo.sobre...@outlook.pt> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I am implementing a use case where I read some sensor data from Kafka >>>>> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and, >>>>> after some transformations, write the output (RDD) to Cassandra. >>>>> >>>>> Everything is working properly but I am having some trouble with the >>>>> performance. My kafka topic receives around 2000 messages per second. For >>>>> a >>>>> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to >>>>> Cassandra, which is not acceptable for longer runs. >>>>> >>>>> I am running this application in a "sandbox" with 12GB of RAM, 2 cores >>>>> and 30GB SSD space. >>>>> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1). >>>>> >>>>> I would like to know you have some suggestion to improve performance >>>>> (other than getting more resources :) ). >>>>> >>>>> My code (pyspark) is posted in the end of this email so you can take a >>>>> look. I tried some different cassandra configurations following this link: >>>>> http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark >>>>> (recommended in stackoverflow for similar questions). >>>>> >>>>> >>>>> Thank you in advance, >>>>> >>>>> Best Regards, >>>>> Saulo >>>>> >>>>> >>>>> >>>>> =============== # CODE # ================================= >>>>> #### >>>>> # run command: >>>>> # spark2-submit --packages >>>>> org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2 >>>>> --conf spark.cassandra.connection.host='localhost' --num-executors 2 >>>>> --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2 >>>>> ## >>>>> >>>>> # Run Spark imports >>>>> from pyspark import SparkConf # SparkContext, SparkConf >>>>> from pyspark.streaming import StreamingContext >>>>> from pyspark.streaming.kafka import KafkaUtils >>>>> >>>>> # Run Cassandra imports >>>>> import pyspark_cassandra >>>>> from pyspark_cassandra import CassandraSparkContext, saveToCassandra >>>>> >>>>> def recordHandler(record): >>>>> (mid, tt, in_tt, sid, mv) = parseData( record ) >>>>> return processMetrics(mid, tt, in_tt, sid, mv) >>>>> >>>>> def process(time, rdd): >>>>> rdd2 = rdd.map( lambda w: recordHandler(w[1]) ) >>>>> if rdd2.count() > 0: >>>>> return rdd2 >>>>> >>>>> def casssave(time, rdd): >>>>> rdd.saveToCassandra( "test_hdpkns", "measurement" ) >>>>> >>>>> # ... >>>>> brokers, topic = sys.argv[1:] >>>>> >>>>> # ... >>>>> >>>>> sconf = SparkConf() \ >>>>> .setAppName("SensorDataStreamHandler") \ >>>>> .setMaster("local[*]") \ >>>>> .set("spark.default.parallelism", "2") >>>>> >>>>> sc = CassandraSparkContext(conf = sconf) >>>>> batchIntervalSeconds = 2 >>>>> ssc = StreamingContext(sc, batchIntervalSeconds) >>>>> >>>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], >>>>> {"metadata.broker.list": brokers}) >>>>> >>>>> kafkaStream \ >>>>> .transform(process) \ >>>>> .foreachRDD(casssave) >>>>> >>>>> ssc.start() >>>>> ssc.awaitTermination() >>>>> >>>>> ================================================ >>>>> >>>>> >>>>> >>>>> >>>>> > > -- > Alonso Isidoro Roman > [image: https://]about.me/alonso.isidoro.roman > > <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >