Hi,
as said thanks for little discussion over mail.
I understand that the action is triggered in the end at the write and then all 
of a sudden everything is executed at once. But I dont really need to trigger 
an action before. I am caching somewherew a df that will be reused several 
times (slightly updated pseudocode below).

Question: is it then better practice to already trigger some actions on  
intermediate data frame (like df4 and df8), and cache them? So that these 
actions will not be that expensive yet, and the actions to write at the end 
will require less resources, which would allow to process more days in one go? 
LIke what is added in red in improvement section in the pseudo code below?



pseudocode:


loop over all days:
    spark submit 1 day



with spark submit (overly simplified)=


  df=spark.read(hfs://somepath)
  …
   ##IMPROVEMENT START
   df4=spark.sql(some stuff with df3)
   spark.sql(CACHE TABLE df4)
   …
   df8=spark.sql(some stuff with df7)
   spark.sql(CACHE TABLE df8)
  ##IMPROVEMENT END
   ...
   df12=df11.spark.sql(complex stufff)
  spark.sql(CACHE TABLE df10)
   ...
  df13=spark.sql( complex stuff with df12)
  df13.write
  df14=spark.sql( some other complex stuff with df12)
  df14.write
  df15=spark.sql( some completely other complex stuff with df12)
  df15.write






THanks!



On 31 Mar 2022, at 14:37, Sean Owen <sro...@gmail.com<mailto:sro...@gmail.com>> 
wrote:

If that is your loop unrolled, then you are not doing parts of work at a time. 
That will execute all operations in one go when the write finally happens. 
That's OK, but may be part of the problem. For example if you are filtering for 
a subset, processing, and unioning, then that is just a harder and slower way 
of applying the transformation to all data at once.

On Thu, Mar 31, 2022 at 3:30 AM Joris Billen 
<joris.bil...@bigindustries.be<mailto:joris.bil...@bigindustries.be>> wrote:
Thanks for reply :-)

I am using pyspark. Basicially my code (simplified is):

df=spark.read.csv(hdfs://somehdfslocation)
df1=spark.sql (complex statement using df)
...
dfx=spark.sql(complex statement using df x-1)
...
dfx15.write()


What exactly is meant by "closing resources"? Is it just unpersisting cached 
dataframes at the end and stopping the spark context explicitly: sc.stop()?


FOr processing many years at once versus a chunk in a loop: I see that if I go 
up to certain number of days, one iteration will start to have tasks that fail. 
So I only take a limited number of days, and do this process several times. 
Isnt this normal as you are always somehow limited in terms of resources (I 
have 9 nodes wiht 32GB). Or is it like this that in theory you could process 
any volume, in case you wait long enough? I guess spark can only break down the 
tasks up to a certain level (based on the datasets' and the intermediate 
results’ partitions) and at some moment you hit the limit where your resources 
are not sufficient anymore to process such one task? Maybe you can tweak it a 
bit, but in the end you’ll hit a limit?



Concretely  following topics would be interesting to find out more about 
(links):
-where to see what you are still consuming after spark job ended if you didnt 
close resources
-memory leaks for pyspark
-good article about closing resources (you find tons of snippets on how to 
start spark context+ config for number/cores/memory of worker/executors etc, 
but never saw a focus on making sure you clean up —> or is it just stopping the 
spark context)




On 30 Mar 2022, at 21:24, Bjørn Jørgensen 
<bjornjorgen...@gmail.com<mailto:bjornjorgen...@gmail.com>> wrote:

It`s quite impossible for anyone to answer your question about what is eating 
your memory, without even knowing what language you are using.

If you are using C then it`s always pointers, that's the mem issue.
If you are using python, there can be some like not using context manager like 
With Context Managers and Python's with 
Statement<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Frealpython.com%2Fpython-with-statement%2F&data=04%7C01%7Cjoris.billen%40bigindustries.be%7C1cce33f280734111bdc008da1313395e%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637843270542232597%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=E0SvQRQUWZWRJTDK7l7xmk%2FnlI%2FgOBkkFwQoFQnnqzY%3D&reserved=0>
And another can be not to close resources after use.

In my experience you can process 3 years or more of data, IF you are closing 
opened resources.
I use the web GUI http://spark:4040<http://spark:4040/> to follow what spark is 
doing.




ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen 
<joris.bil...@bigindustries.be<mailto:joris.bil...@bigindustries.be>>:
Thanks for answer-much appreciated! This forum is very useful :-)

I didnt know the sparkcontext stays alive. I guess this is eating up memory.  
The eviction means that he knows that he should clear some of the old cached 
memory to be able to store new one. In case anyone has good articles about 
memory leaks I would be interested to read.
I will try to add following lines at the end of my job (as I cached the table 
in spark sql):


sqlContext.sql("UNCACHE TABLE mytableofinterest ")
spark.stop()


Wrt looping: if I want to process 3 years of data, my modest cluster will never 
do it one go , I would expect? I have to break it down in smaller pieces and 
run that in a loop (1 day is already lots of data).



Thanks!




On 30 Mar 2022, at 17:25, Sean Owen <sro...@gmail.com<mailto:sro...@gmail.com>> 
wrote:

The Spark context does not stop when a job does. It stops when you stop it. 
There could be many ways mem can leak. Caching maybe - but it will evict. You 
should be clearing caches when no longer needed.

I would guess it is something else your program holds on to in its logic.

Also consider not looping; there is probably a faster way to do it in one go.

On Wed, Mar 30, 2022, 10:16 AM Joris Billen 
<joris.bil...@bigindustries.be<mailto:joris.bil...@bigindustries.be>> wrote:
Hi,
I have a pyspark job submitted through spark-submit that does some heavy 
processing for 1 day of data. It runs with no errors. I have to loop over many 
days, so I run this spark job in a loop. I notice after couple executions the 
memory is increasing on all worker nodes and eventually this leads to 
faillures. My job does some caching, but I understand that when the job ends 
successfully, then the sparkcontext is destroyed and the cache should be 
cleared. However it seems that something keeps on filling the memory a bit more 
and more after each run. THis is the memory behaviour over time, which in the 
end will start leading to failures :
[X]

(what we see is: green=physical memory used, green-blue=physical memory cached, 
grey=memory capacity =straight line around 31GB )
This runs on a healthy spark 2.4 and was optimized already to come to a stable 
job in terms of spark-submit resources parameters like 
driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
Any clue how to “really” clear the memory in between jobs? So basically 
currently I can loop 10x and then need to restart my cluster so all memory is 
cleared completely.


Thanks for any info!

<Screenshot 2022-03-30 at 15.28.24.png>



--
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Reply via email to