This feels like premature optimization, and not clear it's optimizing, but
maybe.
Caching things that are used once is worse than not caching. It looks like
a straight-line through to the write, so I doubt caching helps anything
here.

On Fri, Apr 1, 2022 at 2:49 AM Joris Billen <joris.bil...@bigindustries.be>
wrote:

> Hi,
> as said thanks for little discussion over mail.
> I understand that the action is triggered in the end at the write and then
> all of a sudden everything is executed at once. But I dont really need to
> trigger an action before. I am caching somewherew a df that will be reused
> several times (slightly updated pseudocode below).
>
> Question: is it then better practice to already trigger some actions on
>  intermediate data frame (like df4 and df8), and cache them? So that these
> actions will not be that expensive yet, and the actions to write at the end
> will require less resources, which would allow to process more days in one
> go? LIke what is added in red in improvement section in the pseudo code
> below?
>
>
>
> *pseudocode:*
>
>
> *loop over all days:*
> *    spark submit 1 day*
>
>
>
> with spark submit (overly simplified)=
>
>
> *  df=spark.read(hfs://somepath)*
> *  …*
> *   ##IMPROVEMENT START*
> *   df4=spark.sql(some stuff with df3)*
> *   spark.sql(CACHE TABLE df4)*
> *   …*
> *   df8=spark.sql(some stuff with df7)*
> *   spark.sql(CACHE TABLE df8)*
> *  ##IMPROVEMENT END*
> *   ...*
> *   df12=df11.spark.sql(complex stufff)*
> *  spark.sql(CACHE TABLE df10)*
> *   ...*
> *  df13=spark.sql( complex stuff with df12)*
> *  df13.write *
> *  df14=spark.sql( some other complex stuff with df12)*
> *  df14.write *
> *  df15=spark.sql( some completely other complex stuff with df12)*
> *  df15.write *
>
>
>
>
>
>
> THanks!
>
>
>
> On 31 Mar 2022, at 14:37, Sean Owen <sro...@gmail.com> wrote:
>
> If that is your loop unrolled, then you are not doing parts of work at a
> time. That will execute all operations in one go when the write finally
> happens. That's OK, but may be part of the problem. For example if you are
> filtering for a subset, processing, and unioning, then that is just a
> harder and slower way of applying the transformation to all data at once.
>
> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen <
> joris.bil...@bigindustries.be> wrote:
>
>> Thanks for reply :-)
>>
>> I am using pyspark. Basicially my code (simplified is):
>>
>> df=spark.read.csv(hdfs://somehdfslocation)
>> df1=spark.sql (complex statement using df)
>> ...
>> dfx=spark.sql(complex statement using df x-1)
>> ...
>> dfx15.write()
>>
>>
>> What exactly is meant by "closing resources"? Is it just unpersisting
>> cached dataframes at the end and stopping the spark context explicitly:
>> sc.stop()?
>>
>>
>> FOr processing many years at once versus a chunk in a loop: I see that if
>> I go up to certain number of days, one iteration will start to have tasks
>> that fail. So I only take a limited number of days, and do this process
>> several times. Isnt this normal as you are always somehow limited in terms
>> of resources (I have 9 nodes wiht 32GB). Or is it like this that in theory
>> you could process any volume, in case you wait long enough? I guess spark
>> can only break down the tasks up to a certain level (based on the datasets'
>> and the intermediate results’ partitions) and at some moment you hit the
>> limit where your resources are not sufficient anymore to process such one
>> task? Maybe you can tweak it a bit, but in the end you’ll hit a limit?
>>
>>
>>
>> Concretely  following topics would be interesting to find out more about
>> (links):
>> -where to see what you are still consuming after spark job ended if you
>> didnt close resources
>> -memory leaks for pyspark
>> -good article about closing resources (you find tons of snippets on how
>> to start spark context+ config for number/cores/memory of worker/executors
>> etc, but never saw a focus on making sure you clean up —> or is it just
>> stopping the spark context)
>>
>>
>>
>>
>> On 30 Mar 2022, at 21:24, Bjørn Jørgensen <bjornjorgen...@gmail.com>
>> wrote:
>>
>> It`s quite impossible for anyone to answer your question about what is
>> eating your memory, without even knowing what language you are using.
>>
>> If you are using C then it`s always pointers, that's the mem issue.
>> If you are using python, there can be some like not using context manager
>> like With Context Managers and Python's with Statement
>> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Frealpython.com%2Fpython-with-statement%2F&data=04%7C01%7Cjoris.billen%40bigindustries.be%7C1cce33f280734111bdc008da1313395e%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637843270542232597%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=E0SvQRQUWZWRJTDK7l7xmk%2FnlI%2FgOBkkFwQoFQnnqzY%3D&reserved=0>
>>
>> And another can be not to close resources after use.
>>
>> In my experience you can process 3 years or more of data, IF you are
>> closing opened resources.
>> I use the web GUI http://spark:4040 to follow what spark is doing.
>>
>>
>>
>>
>> ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen <
>> joris.bil...@bigindustries.be>:
>>
>>> Thanks for answer-much appreciated! This forum is very useful :-)
>>>
>>> I didnt know the sparkcontext stays alive. I guess this is eating up
>>> memory.  The eviction means that he knows that he should clear some of the
>>> old cached memory to be able to store new one. In case anyone has good
>>> articles about memory leaks I would be interested to read.
>>> I will try to add following lines at the end of my job (as I cached the
>>> table in spark sql):
>>>
>>>
>>> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")*
>>> *spark.stop()*
>>>
>>>
>>> Wrt looping: if I want to process 3 years of data, my modest cluster
>>> will never do it one go , I would expect? I have to break it down in
>>> smaller pieces and run that in a loop (1 day is already lots of data).
>>>
>>>
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>> On 30 Mar 2022, at 17:25, Sean Owen <sro...@gmail.com> wrote:
>>>
>>> The Spark context does not stop when a job does. It stops when you stop
>>> it. There could be many ways mem can leak. Caching maybe - but it will
>>> evict. You should be clearing caches when no longer needed.
>>>
>>> I would guess it is something else your program holds on to in its
>>> logic.
>>>
>>> Also consider not looping; there is probably a faster way to do it in
>>> one go.
>>>
>>> On Wed, Mar 30, 2022, 10:16 AM Joris Billen <
>>> joris.bil...@bigindustries.be> wrote:
>>>
>>>> Hi,
>>>> I have a pyspark job submitted through spark-submit that does some
>>>> heavy processing for 1 day of data. It runs with no errors. I have to loop
>>>> over many days, so I run this spark job in a loop. I notice after couple
>>>> executions the memory is increasing on all worker nodes and eventually this
>>>> leads to faillures. My job does some caching, but I understand that when
>>>> the job ends successfully, then the sparkcontext is destroyed and the cache
>>>> should be cleared. However it seems that something keeps on filling the
>>>> memory a bit more and more after each run. THis is the memory behaviour
>>>> over time, which in the end will start leading to failures :
>>>>
>>>> (what we see is: green=physical memory used, green-blue=physical memory
>>>> cached, grey=memory capacity =straight line around 31GB )
>>>> This runs on a healthy spark 2.4 and was optimized already to come to a
>>>> stable job in terms of spark-submit resources parameters like
>>>> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
>>>> Any clue how to “really” clear the memory in between jobs? So basically
>>>> currently I can loop 10x and then need to restart my cluster so all memory
>>>> is cleared completely.
>>>>
>>>>
>>>> Thanks for any info!
>>>>
>>>> <Screenshot 2022-03-30 at 15.28.24.png>
>>>
>>>
>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>>
>>
>

Reply via email to