[Spark2.1] SparkStreaming to Cassandra performance problem
Hi all, I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra. Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs. I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1). I would like to know you have some suggestion to improve performance (other than getting more resources :) ). My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark (recommended in stackoverflow for similar questions). Thank you in advance, Best Regards, Saulo === # CODE # = # run command: # spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2 --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2 ## # Run Spark imports from pyspark import SparkConf # SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils # Run Cassandra imports import pyspark_cassandra from pyspark_cassandra import CassandraSparkContext, saveToCassandra def recordHandler(record): (mid, tt, in_tt, sid, mv) = parseData( record ) return processMetrics(mid, tt, in_tt, sid, mv) def process(time, rdd): rdd2 = rdd.map( lambda w: recordHandler(w[1]) ) if rdd2.count() > 0: return rdd2 def casssave(time, rdd): rdd.saveToCassandra( "test_hdpkns", "measurement" ) # ... brokers, topic = sys.argv[1:] # ... sconf = SparkConf() \ .setAppName("SensorDataStreamHandler") \ .setMaster("local[*]") \ .set("spark.default.parallelism", "2") sc = CassandraSparkContext(conf = sconf) batchIntervalSeconds = 2 ssc = StreamingContext(sc, batchIntervalSeconds) kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) kafkaStream \ .transform(process) \ .foreachRDD(casssave) ssc.start() ssc.awaitTermination()
[Spark2.X] SparkStreaming to Cassandra performance problem
Hi all, I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra. Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs. I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. HDP: Spark 2.1 I would like to know you have some suggestion to improve performance (other than getting more resources :) ). My code (pyspark) is posted in the end of this email so you can take a look. Thank you in advance, Best Regards, Saulo === # CODE # = # run command: # spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2 --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2 ## # Run Spark imports from pyspark import SparkConf # SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils # Run Cassandra imports import pyspark_cassandra from pyspark_cassandra import CassandraSparkContext, saveToCassandra def recordHandler(record): (mid, tt, in_tt, sid, mv) = parseData( record ) return processMetrics(mid, tt, in_tt, sid, mv) def process(time, rdd): rdd2 = rdd.map( lambda w: recordHandler(w[1]) ) if rdd2.count() > 0: return rdd2 def casssave(time, rdd): rdd.saveToCassandra( "test_hdpkns", "measurement" ) # ... brokers, topic = sys.argv[1:] # ... sconf = SparkConf() \ .setAppName("SensorDataStreamHandler") \ .setMaster("local[*]") \ .set("spark.default.parallelism", "2") sc = CassandraSparkContext(conf = sconf) batchIntervalSeconds = 2 ssc = StreamingContext(sc, batchIntervalSeconds) kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) kafkaStream \ .transform(process) \ .foreachRDD(casssave) ssc.start() ssc.awaitTermination()
Re: [Spark 2.x Core] .collect() size limit
spark.driver.maxResultSize http://spark.apache.org/docs/latest/configuration.html On Sat, Apr 28, 2018 at 8:41 AM, klrmowsewrote: > i am currently trying to find a workaround for the Spark application i am > working on so that it does not have to use .collect() > > but, for now, it is going to have to use .collect() > > what is the size limit (memory for the driver) of RDD file that .collect() > can work with? > > i've been scouring google-search - S.O., blogs, etc, and everyone is > cautioning about .collect(), but does not specify how huge is huge... are > we > talking about a few gigabytes? terabytes?? petabytes??? > > > > thank you > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: A naive ML question
Hi, I mean a transaction goes typically goes through different states like STARTED, PENDING, CANCELLED, COMPLETED, SETTLED etc... Thanks, kant On Sat, Apr 28, 2018 at 4:11 AM, Jörn Frankewrote: > What do you mean by “how it evolved over time” ? A transaction describes > basically an action at a certain point of time. Do you mean how a financial > product evolved over time given a set of a transactions? > > > On 28. Apr 2018, at 12:46, kant kodali wrote: > > > > Hi All, > > > > I have a bunch of financial transactional data and I was wondering if > there is any ML model that can give me a graph structure for this data? > other words, show how a transaction had evolved over time? > > > > Any suggestions or references would help. > > > > Thanks! > > >
Re: [Spark 2.x Core] .collect() size limit
I believe the virtualization of memory happens at the OS layer hiding it completely from the application layer On Sat, 28 Apr 2018, 22:22 Stephen Boesch,wrote: > While it is certainly possible to use VM I have seen in a number of places > warnings that collect() results must be able to be fit in memory. I'm not > sure if that applies to *all" spark calculations: but in the very least > each of the specific collect()'s that are performed would need to be > verified. > > And maybe *all *collects do require sufficient memory - would you like to > check the source code to see if there were disk backed collects actually > happening for some cases? > > 2018-04-28 9:48 GMT-07:00 Deepak Goel : > >> There is something as *virtual memory* >> >> On Sat, 28 Apr 2018, 21:19 Stephen Boesch, wrote: >> >>> Do you have a machine with terabytes of RAM? afaik collect() requires >>> RAM - so that would be your limiting factor. >>> >>> 2018-04-28 8:41 GMT-07:00 klrmowse : >>> i am currently trying to find a workaround for the Spark application i am working on so that it does not have to use .collect() but, for now, it is going to have to use .collect() what is the size limit (memory for the driver) of RDD file that .collect() can work with? i've been scouring google-search - S.O., blogs, etc, and everyone is cautioning about .collect(), but does not specify how huge is huge... are we talking about a few gigabytes? terabytes?? petabytes??? thank you -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >
Re: [Spark 2.x Core] .collect() size limit
While it is certainly possible to use VM I have seen in a number of places warnings that collect() results must be able to be fit in memory. I'm not sure if that applies to *all" spark calculations: but in the very least each of the specific collect()'s that are performed would need to be verified. And maybe *all *collects do require sufficient memory - would you like to check the source code to see if there were disk backed collects actually happening for some cases? 2018-04-28 9:48 GMT-07:00 Deepak Goel: > There is something as *virtual memory* > > On Sat, 28 Apr 2018, 21:19 Stephen Boesch, wrote: > >> Do you have a machine with terabytes of RAM? afaik collect() requires >> RAM - so that would be your limiting factor. >> >> 2018-04-28 8:41 GMT-07:00 klrmowse : >> >>> i am currently trying to find a workaround for the Spark application i am >>> working on so that it does not have to use .collect() >>> >>> but, for now, it is going to have to use .collect() >>> >>> what is the size limit (memory for the driver) of RDD file that >>> .collect() >>> can work with? >>> >>> i've been scouring google-search - S.O., blogs, etc, and everyone is >>> cautioning about .collect(), but does not specify how huge is huge... >>> are we >>> talking about a few gigabytes? terabytes?? petabytes??? >>> >>> >>> >>> thank you >>> >>> >>> >>> -- >>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >>
Re: [Spark 2.x Core] .collect() size limit
There is something as *virtual memory* On Sat, 28 Apr 2018, 21:19 Stephen Boesch,wrote: > Do you have a machine with terabytes of RAM? afaik collect() requires > RAM - so that would be your limiting factor. > > 2018-04-28 8:41 GMT-07:00 klrmowse : > >> i am currently trying to find a workaround for the Spark application i am >> working on so that it does not have to use .collect() >> >> but, for now, it is going to have to use .collect() >> >> what is the size limit (memory for the driver) of RDD file that .collect() >> can work with? >> >> i've been scouring google-search - S.O., blogs, etc, and everyone is >> cautioning about .collect(), but does not specify how huge is huge... are >> we >> talking about a few gigabytes? terabytes?? petabytes??? >> >> >> >> thank you >> >> >> >> -- >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >
Re: [Spark 2.x Core] .collect() size limit
Do you have a machine with terabytes of RAM? afaik collect() requires RAM - so that would be your limiting factor. 2018-04-28 8:41 GMT-07:00 klrmowse: > i am currently trying to find a workaround for the Spark application i am > working on so that it does not have to use .collect() > > but, for now, it is going to have to use .collect() > > what is the size limit (memory for the driver) of RDD file that .collect() > can work with? > > i've been scouring google-search - S.O., blogs, etc, and everyone is > cautioning about .collect(), but does not specify how huge is huge... are > we > talking about a few gigabytes? terabytes?? petabytes??? > > > > thank you > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
[Spark 2.x Core] .collect() size limit
i am currently trying to find a workaround for the Spark application i am working on so that it does not have to use .collect() but, for now, it is going to have to use .collect() what is the size limit (memory for the driver) of RDD file that .collect() can work with? i've been scouring google-search - S.O., blogs, etc, and everyone is cautioning about .collect(), but does not specify how huge is huge... are we talking about a few gigabytes? terabytes?? petabytes??? thank you -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Sequence file to Image in spark
Hi All, I am trying to convert sequence file to image in spark. i found that when i was reading bytearrayinputstream from bytes it throws serialization exception. Any insight will be helpful. scala> sc.sequenceFile[NullWritable,BytesWritable]("D:/seqImage").map(x => {ImageIO.write(ImageIO.read(newByteArrayInputStream(x._2.copyBytes())),"png",new File("D:/ima"))}).collect 2018-04-28 15:45:52 ERROR Executor:91 - Exception in task 0.0 in stage 8.0 (TID 14) java.lang.IllegalArgumentException: image == null! at javax.imageio.ImageTypeSpecifier.createFromRenderedImage(Unknown Sour ce) at javax.imageio.ImageIO.getWriter(Unknown Source) at javax.imageio.ImageIO.write(Unknown Source) at $line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:31) at $line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:31) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala: 59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala: 104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala: 48) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala :302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala: 289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.sca la:939) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.sca la:939) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc ala:2067) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc ala:2067) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 2018-04-28 15:45:52 WARN TaskSetManager:66 - Lost task 0.0 in stage 8.0 (TID 14 , localhost, executor driver): java.lang.IllegalArgumentException: image == null ! at javax.imageio.ImageTypeSpecifier.createFromRenderedImage(Unknown Sour ce) -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Dataframe vs dataset
Ok from the language you used, you are saying kind of that Dataset is a subset of Dataframe. I would disagree because to me a DataFrame is just a Dataset of org.spache.spark.sql.Row On Sat, Apr 28, 2018, 8:34 AM Marco Mistroniwrote: > Imho .neither..I see datasets as typed df and therefore ds are enhanced df > Feel free to disagree.. > Kr > > On Sat, Apr 28, 2018, 2:24 PM Michael Artz wrote: > >> Hi, >> >> I use Spark everyday and I have a good grip on the basics of Spark, so >> this question isnt for myself. But this came up and I wanted to see what >> other Spark users would say, and I dont want to influence your answer. And >> SO is weird about polls. The question is >> >> "Which one do you feel is accurate... Dataset is a subset of DataFrame, >> or DataFrame a subset of Dataset?" >> >
Re: Dataframe vs dataset
Imho .neither..I see datasets as typed df and therefore ds are enhanced df Feel free to disagree.. Kr On Sat, Apr 28, 2018, 2:24 PM Michael Artzwrote: > Hi, > > I use Spark everyday and I have a good grip on the basics of Spark, so > this question isnt for myself. But this came up and I wanted to see what > other Spark users would say, and I dont want to influence your answer. And > SO is weird about polls. The question is > > "Which one do you feel is accurate... Dataset is a subset of DataFrame, > or DataFrame a subset of Dataset?" >
Dataframe vs dataset
Hi, I use Spark everyday and I have a good grip on the basics of Spark, so this question isnt for myself. But this came up and I wanted to see what other Spark users would say, and I dont want to influence your answer. And SO is weird about polls. The question is "Which one do you feel is accurate... Dataset is a subset of DataFrame, or DataFrame a subset of Dataset?"
Re: A naive ML question
What do you mean by “how it evolved over time” ? A transaction describes basically an action at a certain point of time. Do you mean how a financial product evolved over time given a set of a transactions? > On 28. Apr 2018, at 12:46, kant kodaliwrote: > > Hi All, > > I have a bunch of financial transactional data and I was wondering if there > is any ML model that can give me a graph structure for this data? other > words, show how a transaction had evolved over time? > > Any suggestions or references would help. > > Thanks! > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
A naive ML question
Hi All, I have a bunch of financial transactional data and I was wondering if there is any ML model that can give me a graph structure for this data? other words, show how a transaction had evolved over time? Any suggestions or references would help. Thanks!