RE: [PySpark - 1.6] - Avoid object serialization

2017-01-01 Thread Sidney Feiner
Thanks everybody but I've found another way of doing it.
Because I didn't really actually need an instance of my class, I created a 
"static" class. All variables get initiated as class variables and all methods 
are class methods.
Thanks a lot anyways, hope my answer will also help one day ☺

Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>

From: Holden Karau [mailto:hol...@pigscanfly.ca]
Sent: Thursday, December 29, 2016 8:54 PM
To: Chawla,Sumit <sumitkcha...@gmail.com>; Eike von Seggern 
<eike.segg...@sevenval.com>
Cc: Sidney Feiner <sidney.fei...@startapp.com>; user@spark.apache.org
Subject: Re: [PySpark - 1.6] - Avoid object serialization

Alternatively, using the broadcast functionality can also help with this.

On Thu, Dec 29, 2016 at 3:05 AM Eike von Seggern 
<eike.segg...@sevenval.com<mailto:eike.segg...@sevenval.com>> wrote:
2016-12-28 20:17 GMT+01:00 Chawla,Sumit 
<sumitkcha...@gmail.com<mailto:sumitkcha...@gmail.com>>:
Would this work for you?

def processRDD(rdd):
analyzer = ShortTextAnalyzer(root_dir)
rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1]))

ssc.union(*streams).filter(lambda x: x[1] != None)
.foreachRDD(lambda rdd: processRDD(rdd))

I think, this will still send each analyzer to all executors where rdd 
partitions are stored.

Maybe you can work around this with `RDD.foreachPartition()`:

def processRDD(rdd):
def partition_func(records):
analyzer = ShortTextAnalyzer(root_dir)
for record in records:
analyzer.analyze_short_text_event(record[1])
rdd.foreachPartition(partition_func)

This will create one analyzer per partition and RDD.

Best

Eike


Re: [PySpark - 1.6] - Avoid object serialization

2016-12-29 Thread Holden Karau
Alternatively, using the broadcast functionality can also help with this.

On Thu, Dec 29, 2016 at 3:05 AM Eike von Seggern 
wrote:

> 2016-12-28 20:17 GMT+01:00 Chawla,Sumit :
>
> Would this work for you?
>
> def processRDD(rdd):
> analyzer = ShortTextAnalyzer(root_dir)
> rdd.foreach(lambda record:
> analyzer.analyze_short_text_event(record[1]))
>
> ssc.union(*streams).filter(lambda x: x[1] != None)
> .foreachRDD(lambda rdd: processRDD(rdd))
>
>
> I think, this will still send each analyzer to all executors where rdd
> partitions are stored.
>
> Maybe you can work around this with `RDD.foreachPartition()`:
>
> def processRDD(rdd):
> def partition_func(records):
> analyzer = ShortTextAnalyzer(root_dir)
> for record in records:
> analyzer.analyze_short_text_event(record[1])
> rdd.foreachPartition(partition_func)
>
> This will create one analyzer per partition and RDD.
>
> Best
>
> Eike
>


Re: [PySpark - 1.6] - Avoid object serialization

2016-12-29 Thread Eike von Seggern
2016-12-28 20:17 GMT+01:00 Chawla,Sumit :

> Would this work for you?
>
> def processRDD(rdd):
> analyzer = ShortTextAnalyzer(root_dir)
> rdd.foreach(lambda record: analyzer.analyze_short_text_
> event(record[1]))
>
> ssc.union(*streams).filter(lambda x: x[1] != None)
> .foreachRDD(lambda rdd: processRDD(rdd))
>

I think, this will still send each analyzer to all executors where rdd
partitions are stored.

Maybe you can work around this with `RDD.foreachPartition()`:

def processRDD(rdd):
def partition_func(records):
analyzer = ShortTextAnalyzer(root_dir)
for record in records:
analyzer.analyze_short_text_event(record[1])
rdd.foreachPartition(partition_func)

This will create one analyzer per partition and RDD.

Best

Eike


Re: [PySpark - 1.6] - Avoid object serialization

2016-12-28 Thread Chawla,Sumit
Would this work for you?

def processRDD(rdd):
analyzer = ShortTextAnalyzer(root_dir)
rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1]))

ssc.union(*streams).filter(lambda x: x[1] != None)
.foreachRDD(lambda rdd: processRDD(rdd))



Regards
Sumit Chawla


On Wed, Dec 28, 2016 at 7:57 AM, Sidney Feiner 
wrote:

> Hey,
>
> I just posted this question on Stack Overflow (link here
> )
> and decided to try my luck here as well J
>
>
>
> I'm writing a PySpark job but I got into some performance issues.
> Basically, all it does is read events from Kafka and logs the
> transformations made. Thing is, the transformation is calculated based on
> an object's function, and that object is pretty heavy as it contains a
> Graph and an inner-cache which gets automatically updated as it processes
> rdd's. So when I write the following piece of code:
>
> analyzer = ShortTextAnalyzer(root_dir)
>
> logger.info("Start analyzing the documents from kafka")
>
> ssc.union(*streams).filter(lambda x: x[1] != None).foreachRDD(lambda rdd: 
> rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1])))
>
>
>
> It serializes my analyzer which takes a lot of time because of the graph,
> and as it is copied to the executor, the cache is only relevant for that
> specific RDD.
>
> If the job was written in Scala, I could have written an Object which
> would exist in every executor and then my object wouldn't have to be
> serialized each time.
>
> I've read in a post (http://www.spark.tc/deserialization-in-pyspark-
> storage/) that prior to PySpark 2.0, objects are always serialized. So
> does that mean that I have no way to avoid the serialization?
>
> I'd love to hear about a way to avoid serialization in PySpark if it
> exists. To have my object created once for each executor and then it could
> avoid the serialization process, gain time and actually have a working
> cache system?
>
> Thanks in advance :)
>
> *Sidney Feiner*   */*  SW Developer
>
> M: +972.528197720 <+972%2052-819-7720>  */*  Skype: sidney.feiner.startapp
>
>
>
> [image: StartApp] 
>
>
>