[Spark2.1] SparkStreaming to Cassandra performance problem

2018-04-28 Thread Saulo Sobreiro
Hi all,

I am implementing a use case where I read some sensor data from Kafka with 
SparkStreaming interface (KafkaUtils.createDirectStream) and, after some 
transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the 
performance. My kafka topic receives around 2000 messages per second. For a 4 
min. test, the SparkStreaming app takes 6~7 min. to process and write to 
Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB 
SSD space.
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other 
than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I 
tried some different cassandra configurations following this link: 
http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark
 (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=== # CODE # =

# run command:
# spark2-submit --packages 
org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
  --conf spark.cassandra.connection.host='localhost' --num-executors 2 
--executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
(mid, tt, in_tt, sid, mv) = parseData( record )
return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
if rdd2.count() > 0:
return rdd2

def casssave(time, rdd):
rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
.setAppName("SensorDataStreamHandler") \
.setMaster("local[*]") \
.set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})

kafkaStream \
.transform(process) \
.foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()








[Spark2.X] SparkStreaming to Cassandra performance problem

2018-04-28 Thread Saulo Sobreiro
Hi all,


I am implementing a use case where I read some sensor data from Kafka with 
SparkStreaming interface (KafkaUtils.createDirectStream) and, after some 
transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the 
performance. My kafka topic receives around 2000 messages per second. For a 4 
min. test, the SparkStreaming app takes 6~7 min. to process and write to 
Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB 
SSD space. HDP:
Spark 2.1

I would like to know you have some suggestion to improve performance (other 
than getting more resources :) ).



My code (pyspark) is posted in the end of this email so you can take a look.


Thank you in advance,

Best Regards,
Saulo



=== # CODE # =

# run command:
# spark2-submit --packages 
org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
  --conf spark.cassandra.connection.host='localhost' --num-executors 2 
--executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
(mid, tt, in_tt, sid, mv) = parseData( record )
return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
if rdd2.count() > 0:
return rdd2

def casssave(time, rdd):
rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
.setAppName("SensorDataStreamHandler") \
.setMaster("local[*]") \
.set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})

kafkaStream \
.transform(process) \
.foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()








Re: [Spark 2.x Core] .collect() size limit

2018-04-28 Thread Mark Hamstra
spark.driver.maxResultSize

http://spark.apache.org/docs/latest/configuration.html

On Sat, Apr 28, 2018 at 8:41 AM, klrmowse  wrote:

> i am currently trying to find a workaround for the Spark application i am
> working on so that it does not have to use .collect()
>
> but, for now, it is going to have to use .collect()
>
> what is the size limit (memory for the driver) of RDD file that .collect()
> can work with?
>
> i've been scouring google-search - S.O., blogs, etc, and everyone is
> cautioning about .collect(), but does not specify how huge is huge... are
> we
> talking about a few gigabytes? terabytes?? petabytes???
>
>
>
> thank you
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: A naive ML question

2018-04-28 Thread kant kodali
Hi,

I mean a transaction goes typically goes through different states like
STARTED, PENDING, CANCELLED, COMPLETED, SETTLED etc...

Thanks,
kant

On Sat, Apr 28, 2018 at 4:11 AM, Jörn Franke  wrote:

> What do you mean by “how it evolved over time” ? A transaction describes
> basically an action at a certain point of time. Do you mean how a financial
> product evolved over time given a set of a transactions?
>
> > On 28. Apr 2018, at 12:46, kant kodali  wrote:
> >
> > Hi All,
> >
> > I have a bunch of financial transactional data and I was wondering if
> there is any ML model that can give me a graph structure for this data?
> other words, show how a transaction had evolved over time?
> >
> > Any suggestions or references would help.
> >
> > Thanks!
> >
>


Re: [Spark 2.x Core] .collect() size limit

2018-04-28 Thread Deepak Goel
I believe the virtualization of memory happens at the OS layer hiding it
completely from the application layer

On Sat, 28 Apr 2018, 22:22 Stephen Boesch,  wrote:

> While it is certainly possible to use VM I have seen in a number of places
> warnings that collect() results must be able to be fit in memory. I'm not
> sure if that applies to *all" spark calculations: but in the very least
> each of the specific collect()'s that are performed would need to be
> verified.
>
> And maybe *all *collects do require sufficient memory - would you like to
> check the source code to see if there were disk backed collects actually
> happening for some cases?
>
> 2018-04-28 9:48 GMT-07:00 Deepak Goel :
>
>> There is something as *virtual memory*
>>
>> On Sat, 28 Apr 2018, 21:19 Stephen Boesch,  wrote:
>>
>>> Do you have a machine with  terabytes of RAM?  afaik collect() requires
>>> RAM - so that would be your limiting factor.
>>>
>>> 2018-04-28 8:41 GMT-07:00 klrmowse :
>>>
 i am currently trying to find a workaround for the Spark application i
 am
 working on so that it does not have to use .collect()

 but, for now, it is going to have to use .collect()

 what is the size limit (memory for the driver) of RDD file that
 .collect()
 can work with?

 i've been scouring google-search - S.O., blogs, etc, and everyone is
 cautioning about .collect(), but does not specify how huge is huge...
 are we
 talking about a few gigabytes? terabytes?? petabytes???



 thank you



 --
 Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>


Re: [Spark 2.x Core] .collect() size limit

2018-04-28 Thread Stephen Boesch
While it is certainly possible to use VM I have seen in a number of places
warnings that collect() results must be able to be fit in memory. I'm not
sure if that applies to *all" spark calculations: but in the very least
each of the specific collect()'s that are performed would need to be
verified.

And maybe *all *collects do require sufficient memory - would you like to
check the source code to see if there were disk backed collects actually
happening for some cases?

2018-04-28 9:48 GMT-07:00 Deepak Goel :

> There is something as *virtual memory*
>
> On Sat, 28 Apr 2018, 21:19 Stephen Boesch,  wrote:
>
>> Do you have a machine with  terabytes of RAM?  afaik collect() requires
>> RAM - so that would be your limiting factor.
>>
>> 2018-04-28 8:41 GMT-07:00 klrmowse :
>>
>>> i am currently trying to find a workaround for the Spark application i am
>>> working on so that it does not have to use .collect()
>>>
>>> but, for now, it is going to have to use .collect()
>>>
>>> what is the size limit (memory for the driver) of RDD file that
>>> .collect()
>>> can work with?
>>>
>>> i've been scouring google-search - S.O., blogs, etc, and everyone is
>>> cautioning about .collect(), but does not specify how huge is huge...
>>> are we
>>> talking about a few gigabytes? terabytes?? petabytes???
>>>
>>>
>>>
>>> thank you
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>


Re: [Spark 2.x Core] .collect() size limit

2018-04-28 Thread Deepak Goel
There is something as *virtual memory*

On Sat, 28 Apr 2018, 21:19 Stephen Boesch,  wrote:

> Do you have a machine with  terabytes of RAM?  afaik collect() requires
> RAM - so that would be your limiting factor.
>
> 2018-04-28 8:41 GMT-07:00 klrmowse :
>
>> i am currently trying to find a workaround for the Spark application i am
>> working on so that it does not have to use .collect()
>>
>> but, for now, it is going to have to use .collect()
>>
>> what is the size limit (memory for the driver) of RDD file that .collect()
>> can work with?
>>
>> i've been scouring google-search - S.O., blogs, etc, and everyone is
>> cautioning about .collect(), but does not specify how huge is huge... are
>> we
>> talking about a few gigabytes? terabytes?? petabytes???
>>
>>
>>
>> thank you
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: [Spark 2.x Core] .collect() size limit

2018-04-28 Thread Stephen Boesch
Do you have a machine with  terabytes of RAM?  afaik collect() requires RAM
- so that would be your limiting factor.

2018-04-28 8:41 GMT-07:00 klrmowse :

> i am currently trying to find a workaround for the Spark application i am
> working on so that it does not have to use .collect()
>
> but, for now, it is going to have to use .collect()
>
> what is the size limit (memory for the driver) of RDD file that .collect()
> can work with?
>
> i've been scouring google-search - S.O., blogs, etc, and everyone is
> cautioning about .collect(), but does not specify how huge is huge... are
> we
> talking about a few gigabytes? terabytes?? petabytes???
>
>
>
> thank you
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Spark 2.x Core] .collect() size limit

2018-04-28 Thread klrmowse
i am currently trying to find a workaround for the Spark application i am
working on so that it does not have to use .collect()

but, for now, it is going to have to use .collect()

what is the size limit (memory for the driver) of RDD file that .collect()
can work with?

i've been scouring google-search - S.O., blogs, etc, and everyone is
cautioning about .collect(), but does not specify how huge is huge... are we
talking about a few gigabytes? terabytes?? petabytes???



thank you



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Sequence file to Image in spark

2018-04-28 Thread Selvam Raman
Hi All,

I am trying to convert sequence file to image in spark.

i found that when i was reading bytearrayinputstream from bytes it throws
serialization exception. Any insight will be helpful.

scala> sc.sequenceFile[NullWritable,BytesWritable]("D:/seqImage").map(x =>
{ImageIO.write(ImageIO.read(newByteArrayInputStream(x._2.copyBytes())),"png",new
File("D:/ima"))}).collect

2018-04-28 15:45:52 ERROR Executor:91 - Exception in task 0.0 in stage 8.0
(TID

14)

java.lang.IllegalArgumentException: image == null!

at javax.imageio.ImageTypeSpecifier.createFromRenderedImage(Unknown
Sour

ce)

at javax.imageio.ImageIO.getWriter(Unknown Source)

at javax.imageio.ImageIO.write(Unknown Source)

at
$line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:31)

at
$line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:31)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:

59)

at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:

104)

at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:

48)

at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:310)

at scala.collection.AbstractIterator.to(Iterator.scala:1336)

at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala

:302)

at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)

at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:

289)

at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)

at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.sca

la:939)

at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.sca

la:939)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc

ala:2067)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc

ala:2067)

at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)



at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)

at java.lang.Thread.run(Unknown Source)

2018-04-28 15:45:52 WARN  TaskSetManager:66 - Lost task 0.0 in stage 8.0
(TID 14

, localhost, executor driver): java.lang.IllegalArgumentException: image ==
null

!

at javax.imageio.ImageTypeSpecifier.createFromRenderedImage(Unknown
Sour

ce)


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Dataframe vs dataset

2018-04-28 Thread Michael Artz
Ok from the language you used, you are saying kind of that Dataset is a
subset of Dataframe.  I would disagree because to me a DataFrame is just a
Dataset of org.spache.spark.sql.Row

On Sat, Apr 28, 2018, 8:34 AM Marco Mistroni  wrote:

> Imho .neither..I see datasets as typed df and therefore ds are enhanced df
> Feel free to disagree..
> Kr
>
> On Sat, Apr 28, 2018, 2:24 PM Michael Artz  wrote:
>
>> Hi,
>>
>> I use Spark everyday and I have a good grip on the basics of Spark, so
>> this question isnt for myself.  But this came up and I wanted to see what
>> other Spark users would say, and I dont want to influence your answer.  And
>> SO is weird about polls. The question is
>>
>>  "Which one do you feel is accurate... Dataset is a subset of DataFrame,
>> or DataFrame a subset of Dataset?"
>>
>


Re: Dataframe vs dataset

2018-04-28 Thread Marco Mistroni
Imho .neither..I see datasets as typed df and therefore ds are enhanced df
Feel free to disagree..
Kr

On Sat, Apr 28, 2018, 2:24 PM Michael Artz  wrote:

> Hi,
>
> I use Spark everyday and I have a good grip on the basics of Spark, so
> this question isnt for myself.  But this came up and I wanted to see what
> other Spark users would say, and I dont want to influence your answer.  And
> SO is weird about polls. The question is
>
>  "Which one do you feel is accurate... Dataset is a subset of DataFrame,
> or DataFrame a subset of Dataset?"
>


Dataframe vs dataset

2018-04-28 Thread Michael Artz
Hi,

I use Spark everyday and I have a good grip on the basics of Spark, so this
question isnt for myself.  But this came up and I wanted to see what other
Spark users would say, and I dont want to influence your answer.  And SO is
weird about polls. The question is

 "Which one do you feel is accurate... Dataset is a subset of DataFrame, or
DataFrame a subset of Dataset?"


Re: A naive ML question

2018-04-28 Thread Jörn Franke
What do you mean by “how it evolved over time” ? A transaction describes 
basically an action at a certain point of time. Do you mean how a financial 
product evolved over time given a set of a transactions?

> On 28. Apr 2018, at 12:46, kant kodali  wrote:
> 
> Hi All,
> 
> I have a bunch of financial transactional data and I was wondering if there 
> is any ML model that can give me a graph structure for this data? other 
> words, show how a transaction had evolved over time? 
> 
> Any suggestions or references would help.
> 
> Thanks!
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



A naive ML question

2018-04-28 Thread kant kodali
Hi All,

I have a bunch of financial transactional data and I was wondering if there
is any ML model that can give me a graph structure for this data? other
words, show how a transaction had evolved over time?

Any suggestions or references would help.

Thanks!