This feels like premature optimization, and not clear it's optimizing, but maybe. Caching things that are used once is worse than not caching. It looks like a straight-line through to the write, so I doubt caching helps anything here.
On Fri, Apr 1, 2022 at 2:49 AM Joris Billen <joris.bil...@bigindustries.be> wrote: > Hi, > as said thanks for little discussion over mail. > I understand that the action is triggered in the end at the write and then > all of a sudden everything is executed at once. But I dont really need to > trigger an action before. I am caching somewherew a df that will be reused > several times (slightly updated pseudocode below). > > Question: is it then better practice to already trigger some actions on > intermediate data frame (like df4 and df8), and cache them? So that these > actions will not be that expensive yet, and the actions to write at the end > will require less resources, which would allow to process more days in one > go? LIke what is added in red in improvement section in the pseudo code > below? > > > > *pseudocode:* > > > *loop over all days:* > * spark submit 1 day* > > > > with spark submit (overly simplified)= > > > * df=spark.read(hfs://somepath)* > * …* > * ##IMPROVEMENT START* > * df4=spark.sql(some stuff with df3)* > * spark.sql(CACHE TABLE df4)* > * …* > * df8=spark.sql(some stuff with df7)* > * spark.sql(CACHE TABLE df8)* > * ##IMPROVEMENT END* > * ...* > * df12=df11.spark.sql(complex stufff)* > * spark.sql(CACHE TABLE df10)* > * ...* > * df13=spark.sql( complex stuff with df12)* > * df13.write * > * df14=spark.sql( some other complex stuff with df12)* > * df14.write * > * df15=spark.sql( some completely other complex stuff with df12)* > * df15.write * > > > > > > > THanks! > > > > On 31 Mar 2022, at 14:37, Sean Owen <sro...@gmail.com> wrote: > > If that is your loop unrolled, then you are not doing parts of work at a > time. That will execute all operations in one go when the write finally > happens. That's OK, but may be part of the problem. For example if you are > filtering for a subset, processing, and unioning, then that is just a > harder and slower way of applying the transformation to all data at once. > > On Thu, Mar 31, 2022 at 3:30 AM Joris Billen < > joris.bil...@bigindustries.be> wrote: > >> Thanks for reply :-) >> >> I am using pyspark. Basicially my code (simplified is): >> >> df=spark.read.csv(hdfs://somehdfslocation) >> df1=spark.sql (complex statement using df) >> ... >> dfx=spark.sql(complex statement using df x-1) >> ... >> dfx15.write() >> >> >> What exactly is meant by "closing resources"? Is it just unpersisting >> cached dataframes at the end and stopping the spark context explicitly: >> sc.stop()? >> >> >> FOr processing many years at once versus a chunk in a loop: I see that if >> I go up to certain number of days, one iteration will start to have tasks >> that fail. So I only take a limited number of days, and do this process >> several times. Isnt this normal as you are always somehow limited in terms >> of resources (I have 9 nodes wiht 32GB). Or is it like this that in theory >> you could process any volume, in case you wait long enough? I guess spark >> can only break down the tasks up to a certain level (based on the datasets' >> and the intermediate results’ partitions) and at some moment you hit the >> limit where your resources are not sufficient anymore to process such one >> task? Maybe you can tweak it a bit, but in the end you’ll hit a limit? >> >> >> >> Concretely following topics would be interesting to find out more about >> (links): >> -where to see what you are still consuming after spark job ended if you >> didnt close resources >> -memory leaks for pyspark >> -good article about closing resources (you find tons of snippets on how >> to start spark context+ config for number/cores/memory of worker/executors >> etc, but never saw a focus on making sure you clean up —> or is it just >> stopping the spark context) >> >> >> >> >> On 30 Mar 2022, at 21:24, Bjørn Jørgensen <bjornjorgen...@gmail.com> >> wrote: >> >> It`s quite impossible for anyone to answer your question about what is >> eating your memory, without even knowing what language you are using. >> >> If you are using C then it`s always pointers, that's the mem issue. >> If you are using python, there can be some like not using context manager >> like With Context Managers and Python's with Statement >> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Frealpython.com%2Fpython-with-statement%2F&data=04%7C01%7Cjoris.billen%40bigindustries.be%7C1cce33f280734111bdc008da1313395e%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637843270542232597%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=E0SvQRQUWZWRJTDK7l7xmk%2FnlI%2FgOBkkFwQoFQnnqzY%3D&reserved=0> >> >> And another can be not to close resources after use. >> >> In my experience you can process 3 years or more of data, IF you are >> closing opened resources. >> I use the web GUI http://spark:4040 to follow what spark is doing. >> >> >> >> >> ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen < >> joris.bil...@bigindustries.be>: >> >>> Thanks for answer-much appreciated! This forum is very useful :-) >>> >>> I didnt know the sparkcontext stays alive. I guess this is eating up >>> memory. The eviction means that he knows that he should clear some of the >>> old cached memory to be able to store new one. In case anyone has good >>> articles about memory leaks I would be interested to read. >>> I will try to add following lines at the end of my job (as I cached the >>> table in spark sql): >>> >>> >>> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")* >>> *spark.stop()* >>> >>> >>> Wrt looping: if I want to process 3 years of data, my modest cluster >>> will never do it one go , I would expect? I have to break it down in >>> smaller pieces and run that in a loop (1 day is already lots of data). >>> >>> >>> >>> Thanks! >>> >>> >>> >>> >>> On 30 Mar 2022, at 17:25, Sean Owen <sro...@gmail.com> wrote: >>> >>> The Spark context does not stop when a job does. It stops when you stop >>> it. There could be many ways mem can leak. Caching maybe - but it will >>> evict. You should be clearing caches when no longer needed. >>> >>> I would guess it is something else your program holds on to in its >>> logic. >>> >>> Also consider not looping; there is probably a faster way to do it in >>> one go. >>> >>> On Wed, Mar 30, 2022, 10:16 AM Joris Billen < >>> joris.bil...@bigindustries.be> wrote: >>> >>>> Hi, >>>> I have a pyspark job submitted through spark-submit that does some >>>> heavy processing for 1 day of data. It runs with no errors. I have to loop >>>> over many days, so I run this spark job in a loop. I notice after couple >>>> executions the memory is increasing on all worker nodes and eventually this >>>> leads to faillures. My job does some caching, but I understand that when >>>> the job ends successfully, then the sparkcontext is destroyed and the cache >>>> should be cleared. However it seems that something keeps on filling the >>>> memory a bit more and more after each run. THis is the memory behaviour >>>> over time, which in the end will start leading to failures : >>>> >>>> (what we see is: green=physical memory used, green-blue=physical memory >>>> cached, grey=memory capacity =straight line around 31GB ) >>>> This runs on a healthy spark 2.4 and was optimized already to come to a >>>> stable job in terms of spark-submit resources parameters like >>>> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait). >>>> Any clue how to “really” clear the memory in between jobs? So basically >>>> currently I can loop 10x and then need to restart my cluster so all memory >>>> is cleared completely. >>>> >>>> >>>> Thanks for any info! >>>> >>>> <Screenshot 2022-03-30 at 15.28.24.png> >>> >>> >>> >> >> -- >> Bjørn Jørgensen >> Vestre Aspehaug 4, 6010 Ålesund >> Norge >> >> +47 480 94 297 >> >> >> >