To initialize it per executor, I used a class with only class attibutes and 
class methods (like an `object` in Scala), but because  I was using PySpark, it 
was actually being initiated per process ☹
What I went for was the broadcast variable but there still is something 
suspicious with my application – the processing time of each batch.

In my logs, I see that when I process a partition, it takes under a second. But 
in the Spark UI I see that a task takes between 3 and 6 seconds.
Shouldn't the partition process time and the task computing time be the same?

My code:


def process_func (obj, records):
    start = time()
    processed_records = # Some processing
    logger.info("It took {0} seconds to handle records".format(time() - start, 
events_amount))  # This logs very small numbers (around 0.05 seoonds)
    return analyzed_events

def handle_rdd(rdd: RDD):
    start_time = time.time()
    rdd.foreachPartition(lambda records: process_func(object_broadcast.value, 
records))
    logger.info("Handle RDD took: {0} seconds".format(time.time() - 
start_time))  # This logs much bigger numbers (around 3-6 seconds)

ssc.union(*streams)\
    .filter(lambda x: x[1] is not None)\
    .map(lambda x: x[1])\
    .foreachRDD(handle_rdd)  # Keep only values and cast them to TextAnalysis
ssc.start()
ssc.awaitTermination()


each RDD has at most 10 partitions which means that it should take around 0.5 
seconds for all the tasks to be processed.

Does anyone know what happens here? The time difference is too big for it to be 
networking right?

From: Sudev A C [mailto:sudev...@go-mmt.com]
Sent: Monday, July 3, 2017 7:48 PM
To: Sidney Feiner <sidney.fei...@startapp.com>; user@spark.apache.org
Subject: Re: [PySpark] - running processes

You might want to do the initialisation per partition (Not sure how you are 
achieving the per executor initialisation in your code ).

To initialise something for per partition, you may use something like 
rdd.forEach partition.

Or if you want something globally like a variable for further processing you 
might want to initialise it once as a broadcast variable and use access the 
data structure through broadcast variable.

Afaik python process will be initiated for per partition tasks.
On Mon, 3 Jul 2017 at 5:23 PM, Sidney Feiner 
<sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> wrote:

In my Spark Streaming application, I have the need to build a graph from a file 
and initializing that graph takes between 5 and 10 seconds.

So I tried initializing it once per executor so it'll be initialized only once.

After running the application, I've noticed that it's initiated much more than 
once per executor, every time with a different process id (every process has 
it's own logger).

Doesn't every executor have it's own JVM and it's own process? Or is that only 
relevant when I develop in JVM languages like Scala/Java? Do executors in 
PySpark spawn new processes for new tasks?

And if they do, how can I make sure that my graph object will really only be 
initiated once?
Thanks :)


Sidney Feiner / SW Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]




::DISCLAIMER::

----------------------------------------------------------------------------------------------------------------------------------------------------



This message is intended only for the use of the addressee and may contain 
information that is privileged, confidential and exempt from disclosure under 
applicable law. If the reader of this message is not the intended recipient, or 
the employee or agent responsible for delivering the message to the intended 
recipient, you are hereby notified that any dissemination, distribution or 
copying of this communication is strictly prohibited. If you have received this 
e-mail in error, please notify us immediately by return e-mail and delete this 
e-mail and all attachments from your system.

Reply via email to