Your statement below:

I believe I have found the issue: the job writes data to hbase which is on
the same cluster.
When I keep on processing data and writing with spark to hbase , eventually
the garbage collection can not keep up anymore for hbase, and the hbase
memory consumption increases. As the clusters hosts both hbase and spark,
this leads to an overall increase and at some point you hit the limit of
the available memory on each worker.
I dont think the spark memory is increasing over time.


   1. Where is your cluster on Prem? Do you Have a Hadoop cluster
   with spark using the same nodes as HDFS?
   2. Is your Hbase clustered or standalone and has been created on HDFS
   nodes
   3. Are you writing to Hbase through phoenix or straight to HBase
   4. Where does S3 come into this


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 6 Apr 2022 at 16:41, Joris Billen <joris.bil...@bigindustries.be>
wrote:

> HI,
> thanks for your reply.
>
>
> I believe I have found the issue: the job writes data to hbase which is on
> the same cluster.
> When I keep on processing data and writing with spark to hbase ,
> eventually the garbage collection can not keep up anymore for hbase, and
> the hbase memory consumption increases. As the clusters hosts both hbase
> and spark, this leads to an overall increase and at some point you hit the
> limit of the available memory on each worker.
> I dont think the spark memory is increasing over time.
>
>
>
> Here more details:
>
> **Spark: 2.4
> **operation: many spark sql statements followed by writing data to a nosql
> db from spark
> like this:
> df=read(fromhdfs)
> df2=spark.sql(using df 1)
> ..df10=spark.sql(using df9)
> spark.sql(CACHE TABLE df10)
> df11 =spark.sql(using df10)
> df11.write
> Df12 =spark.sql(using df10)
> df12.write
> df13 =spark.sql(using df10)
> df13.write
> **caching: yes one df that I will use to eventually write 3 x to a db
> (those 3 are different)
> **Loops: since I need to process several years, and processing 1 day is
> already a complex process (40 minutes on 9 node cluster running quite a bit
> of executors). So in the end it will do all at one go and there is a limit
> of how much data I can process in one go with the available resources.
> Some people here pointed out they believe this looping should not be
> necessary. But what is the alternative?
> —> Maybe I can write to disk somewhere in the middle, and read again from
> there so that in the end not all must happen in one go in memory.
>
>
>
>
>
>
>
> On 5 Apr 2022, at 14:58, Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
> Hi,
>
> can you please give details around:
> spark version, what is the operation that you are running, why in loops,
> and whether you are caching in any data or not, and whether you are
> referencing the variables to create them like in the following expression
> we are referencing x to create x, x = x + 1
>
> Thanks and Regards,
> Gourav Sengupta
>
> On Mon, Apr 4, 2022 at 10:51 AM Joris Billen <
> joris.bil...@bigindustries.be> wrote:
>
>> Clear-probably not a good idea.
>>
>> But a previous comment said “you are doing everything in the end in one
>> go”.
>> So this made me wonder: in case your only action is a write in the end
>> after lots of complex transformations, then what is the alternative for
>> writing in the end which means doing everything all at once in the end? My
>> understanding is that if there is no need for an action earlier, you will
>> do all at the end, which means there is a limitation to how many days you
>> can process at once. And hence the solution is to loop over a couple days,
>> and submit always the same spark job just for other input.
>>
>>
>> Thanks!
>>
>> On 1 Apr 2022, at 15:26, Sean Owen <sro...@gmail.com> wrote:
>>
>> This feels like premature optimization, and not clear it's optimizing,
>> but maybe.
>> Caching things that are used once is worse than not caching. It looks
>> like a straight-line through to the write, so I doubt caching helps
>> anything here.
>>
>> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>
>>> Hi,
>>> as said thanks for little discussion over mail.
>>> I understand that the action is triggered in the end at the write and
>>> then all of a sudden everything is executed at once. But I dont really need
>>> to trigger an action before. I am caching somewherew a df that will be
>>> reused several times (slightly updated pseudocode below).
>>>
>>> Question: is it then better practice to already trigger some actions on
>>>  intermediate data frame (like df4 and df8), and cache them? So that these
>>> actions will not be that expensive yet, and the actions to write at the end
>>> will require less resources, which would allow to process more days in one
>>> go? LIke what is added in red in improvement section in the pseudo code
>>> below?
>>>
>>>
>>>
>>> *pseudocode:*
>>>
>>>
>>> *loop over all days:*
>>> *    spark submit 1 day*
>>>
>>>
>>>
>>> with spark submit (overly simplified)=
>>>
>>>
>>> *  df=spark.read(hfs://somepath)*
>>> *  …*
>>> *   ##IMPROVEMENT START*
>>> *   df4=spark.sql(some stuff with df3)*
>>> *   spark.sql(CACHE TABLE df4)*
>>> *   …*
>>> *   df8=spark.sql(some stuff with df7)*
>>> *   spark.sql(CACHE TABLE df8)*
>>> *  ##IMPROVEMENT END*
>>> *   ...*
>>> *   df12=df11.spark.sql(complex stufff)*
>>> *  spark.sql(CACHE TABLE df10)*
>>> *   ...*
>>> *  df13=spark.sql( complex stuff with df12)*
>>> *  df13.write *
>>> *  df14=spark.sql( some other complex stuff with df12)*
>>> *  df14.write *
>>> *  df15=spark.sql( some completely other complex stuff with df12)*
>>> *  df15.write *
>>>
>>>
>>>
>>>
>>>
>>>
>>> THanks!
>>>
>>>
>>>
>>> On 31 Mar 2022, at 14:37, Sean Owen <sro...@gmail.com> wrote:
>>>
>>> If that is your loop unrolled, then you are not doing parts of work at a
>>> time. That will execute all operations in one go when the write finally
>>> happens. That's OK, but may be part of the problem. For example if you are
>>> filtering for a subset, processing, and unioning, then that is just a
>>> harder and slower way of applying the transformation to all data at once.
>>>
>>> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen <
>>> joris.bil...@bigindustries.be> wrote:
>>>
>>>> Thanks for reply :-)
>>>>
>>>> I am using pyspark. Basicially my code (simplified is):
>>>>
>>>> df=spark.read.csv(hdfs://somehdfslocation)
>>>> df1=spark.sql (complex statement using df)
>>>> ...
>>>> dfx=spark.sql(complex statement using df x-1)
>>>> ...
>>>> dfx15.write()
>>>>
>>>>
>>>> What exactly is meant by "closing resources"? Is it just unpersisting
>>>> cached dataframes at the end and stopping the spark context explicitly:
>>>> sc.stop()?
>>>>
>>>>
>>>> FOr processing many years at once versus a chunk in a loop: I see that
>>>> if I go up to certain number of days, one iteration will start to have
>>>> tasks that fail. So I only take a limited number of days, and do this
>>>> process several times. Isnt this normal as you are always somehow limited
>>>> in terms of resources (I have 9 nodes wiht 32GB). Or is it like this that
>>>> in theory you could process any volume, in case you wait long enough? I
>>>> guess spark can only break down the tasks up to a certain level (based on
>>>> the datasets' and the intermediate results’ partitions) and at some moment
>>>> you hit the limit where your resources are not sufficient anymore to
>>>> process such one task? Maybe you can tweak it a bit, but in the end you’ll
>>>> hit a limit?
>>>>
>>>>
>>>>
>>>> Concretely  following topics would be interesting to find out more
>>>> about (links):
>>>> -where to see what you are still consuming after spark job ended if you
>>>> didnt close resources
>>>> -memory leaks for pyspark
>>>> -good article about closing resources (you find tons of snippets on how
>>>> to start spark context+ config for number/cores/memory of worker/executors
>>>> etc, but never saw a focus on making sure you clean up —> or is it just
>>>> stopping the spark context)
>>>>
>>>>
>>>>
>>>>
>>>> On 30 Mar 2022, at 21:24, Bjørn Jørgensen <bjornjorgen...@gmail.com>
>>>> wrote:
>>>>
>>>> It`s quite impossible for anyone to answer your question about what is
>>>> eating your memory, without even knowing what language you are using.
>>>>
>>>> If you are using C then it`s always pointers, that's the mem issue.
>>>> If you are using python, there can be some like not using context
>>>> manager like With Context Managers and Python's with Statement
>>>> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Frealpython.com%2Fpython-with-statement%2F&data=04%7C01%7Cjoris.billen%40bigindustries.be%7C1052bbbb08ce41659e5d08da1703fc52%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637847603163284467%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=xs5S5MLsdtZAod%2FotAiT7Ta7ENXcA4N4lEaDKjxHzYg%3D&reserved=0>
>>>>
>>>> And another can be not to close resources after use.
>>>>
>>>> In my experience you can process 3 years or more of data, IF you are
>>>> closing opened resources.
>>>> I use the web GUI http://spark:4040 to follow what spark is doing.
>>>>
>>>>
>>>>
>>>>
>>>> ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen <
>>>> joris.bil...@bigindustries.be>:
>>>>
>>>>> Thanks for answer-much appreciated! This forum is very useful :-)
>>>>>
>>>>> I didnt know the sparkcontext stays alive. I guess this is eating up
>>>>> memory.  The eviction means that he knows that he should clear some of the
>>>>> old cached memory to be able to store new one. In case anyone has good
>>>>> articles about memory leaks I would be interested to read.
>>>>> I will try to add following lines at the end of my job (as I cached
>>>>> the table in spark sql):
>>>>>
>>>>>
>>>>> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")*
>>>>> *spark.stop()*
>>>>>
>>>>>
>>>>> Wrt looping: if I want to process 3 years of data, my modest cluster
>>>>> will never do it one go , I would expect? I have to break it down in
>>>>> smaller pieces and run that in a loop (1 day is already lots of data).
>>>>>
>>>>>
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 30 Mar 2022, at 17:25, Sean Owen <sro...@gmail.com> wrote:
>>>>>
>>>>> The Spark context does not stop when a job does. It stops when you
>>>>> stop it. There could be many ways mem can leak. Caching maybe - but it 
>>>>> will
>>>>> evict. You should be clearing caches when no longer needed.
>>>>>
>>>>> I would guess it is something else your program holds on to in its
>>>>> logic.
>>>>>
>>>>> Also consider not looping; there is probably a faster way to do it in
>>>>> one go.
>>>>>
>>>>> On Wed, Mar 30, 2022, 10:16 AM Joris Billen <
>>>>> joris.bil...@bigindustries.be> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I have a pyspark job submitted through spark-submit that does some
>>>>>> heavy processing for 1 day of data. It runs with no errors. I have to 
>>>>>> loop
>>>>>> over many days, so I run this spark job in a loop. I notice after couple
>>>>>> executions the memory is increasing on all worker nodes and eventually 
>>>>>> this
>>>>>> leads to faillures. My job does some caching, but I understand that when
>>>>>> the job ends successfully, then the sparkcontext is destroyed and the 
>>>>>> cache
>>>>>> should be cleared. However it seems that something keeps on filling the
>>>>>> memory a bit more and more after each run. THis is the memory behaviour
>>>>>> over time, which in the end will start leading to failures :
>>>>>>
>>>>>> (what we see is: green=physical memory used, green-blue=physical
>>>>>> memory cached, grey=memory capacity =straight line around 31GB )
>>>>>> This runs on a healthy spark 2.4 and was optimized already to come to
>>>>>> a stable job in terms of spark-submit resources parameters like
>>>>>> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
>>>>>> Any clue how to “really” clear the memory in between jobs? So
>>>>>> basically currently I can loop 10x and then need to restart my cluster so
>>>>>> all memory is cleared completely.
>>>>>>
>>>>>>
>>>>>> Thanks for any info!
>>>>>>
>>>>>> <Screenshot 2022-03-30 at 15.28.24.png>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Bjørn Jørgensen
>>>> Vestre Aspehaug 4, 6010 Ålesund
>>>> Norge
>>>>
>>>> +47 480 94 297
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to