1. Where does S3 come into this

He is processing data for each day at a time. So to dump each day to a fast
storage he can use parquet files and write it to S3.

ons. 6. apr. 2022 kl. 22:27 skrev Mich Talebzadeh <mich.talebza...@gmail.com
>:

>
> Your statement below:
>
>
> I believe I have found the issue: the job writes data to hbase which is on
> the same cluster.
> When I keep on processing data and writing with spark to hbase ,
> eventually the garbage collection can not keep up anymore for hbase, and
> the hbase memory consumption increases. As the clusters hosts both hbase
> and spark, this leads to an overall increase and at some point you hit the
> limit of the available memory on each worker.
> I dont think the spark memory is increasing over time.
>
>
>    1. Where is your cluster on Prem? Do you Have a Hadoop cluster
>    with spark using the same nodes as HDFS?
>    2. Is your Hbase clustered or standalone and has been created on HDFS
>    nodes
>    3. Are you writing to Hbase through phoenix or straight to HBase
>    4. Where does S3 come into this
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 6 Apr 2022 at 16:41, Joris Billen <joris.bil...@bigindustries.be>
> wrote:
>
>> HI,
>> thanks for your reply.
>>
>>
>> I believe I have found the issue: the job writes data to hbase which is
>> on the same cluster.
>> When I keep on processing data and writing with spark to hbase ,
>> eventually the garbage collection can not keep up anymore for hbase, and
>> the hbase memory consumption increases. As the clusters hosts both hbase
>> and spark, this leads to an overall increase and at some point you hit the
>> limit of the available memory on each worker.
>> I dont think the spark memory is increasing over time.
>>
>>
>>
>> Here more details:
>>
>> **Spark: 2.4
>> **operation: many spark sql statements followed by writing data to a
>> nosql db from spark
>> like this:
>> df=read(fromhdfs)
>> df2=spark.sql(using df 1)
>> ..df10=spark.sql(using df9)
>> spark.sql(CACHE TABLE df10)
>> df11 =spark.sql(using df10)
>> df11.write
>> Df12 =spark.sql(using df10)
>> df12.write
>> df13 =spark.sql(using df10)
>> df13.write
>> **caching: yes one df that I will use to eventually write 3 x to a db
>> (those 3 are different)
>> **Loops: since I need to process several years, and processing 1 day is
>> already a complex process (40 minutes on 9 node cluster running quite a bit
>> of executors). So in the end it will do all at one go and there is a limit
>> of how much data I can process in one go with the available resources.
>> Some people here pointed out they believe this looping should not be
>> necessary. But what is the alternative?
>> —> Maybe I can write to disk somewhere in the middle, and read again from
>> there so that in the end not all must happen in one go in memory.
>>
>>
>>
>>
>>
>>
>>
>> On 5 Apr 2022, at 14:58, Gourav Sengupta <gourav.sengu...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> can you please give details around:
>> spark version, what is the operation that you are running, why in loops,
>> and whether you are caching in any data or not, and whether you are
>> referencing the variables to create them like in the following expression
>> we are referencing x to create x, x = x + 1
>>
>> Thanks and Regards,
>> Gourav Sengupta
>>
>> On Mon, Apr 4, 2022 at 10:51 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>
>>> Clear-probably not a good idea.
>>>
>>> But a previous comment said “you are doing everything in the end in one
>>> go”.
>>> So this made me wonder: in case your only action is a write in the end
>>> after lots of complex transformations, then what is the alternative for
>>> writing in the end which means doing everything all at once in the end? My
>>> understanding is that if there is no need for an action earlier, you will
>>> do all at the end, which means there is a limitation to how many days you
>>> can process at once. And hence the solution is to loop over a couple days,
>>> and submit always the same spark job just for other input.
>>>
>>>
>>> Thanks!
>>>
>>> On 1 Apr 2022, at 15:26, Sean Owen <sro...@gmail.com> wrote:
>>>
>>> This feels like premature optimization, and not clear it's optimizing,
>>> but maybe.
>>> Caching things that are used once is worse than not caching. It looks
>>> like a straight-line through to the write, so I doubt caching helps
>>> anything here.
>>>
>>> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen <
>>> joris.bil...@bigindustries.be> wrote:
>>>
>>>> Hi,
>>>> as said thanks for little discussion over mail.
>>>> I understand that the action is triggered in the end at the write and
>>>> then all of a sudden everything is executed at once. But I dont really need
>>>> to trigger an action before. I am caching somewherew a df that will be
>>>> reused several times (slightly updated pseudocode below).
>>>>
>>>> Question: is it then better practice to already trigger some actions on
>>>>  intermediate data frame (like df4 and df8), and cache them? So that these
>>>> actions will not be that expensive yet, and the actions to write at the end
>>>> will require less resources, which would allow to process more days in one
>>>> go? LIke what is added in red in improvement section in the pseudo
>>>> code below?
>>>>
>>>>
>>>>
>>>> *pseudocode:*
>>>>
>>>>
>>>> *loop over all days:*
>>>> *    spark submit 1 day*
>>>>
>>>>
>>>>
>>>> with spark submit (overly simplified)=
>>>>
>>>>
>>>> *  df=spark.read(hfs://somepath)*
>>>> *  …*
>>>> *   ##IMPROVEMENT START*
>>>> *   df4=spark.sql(some stuff with df3)*
>>>> *   spark.sql(CACHE TABLE df4)*
>>>> *   …*
>>>> *   df8=spark.sql(some stuff with df7)*
>>>> *   spark.sql(CACHE TABLE df8)*
>>>> *  ##IMPROVEMENT END*
>>>> *   ...*
>>>> *   df12=df11.spark.sql(complex stufff)*
>>>> *  spark.sql(CACHE TABLE df10)*
>>>> *   ...*
>>>> *  df13=spark.sql( complex stuff with df12)*
>>>> *  df13.write *
>>>> *  df14=spark.sql( some other complex stuff with df12)*
>>>> *  df14.write *
>>>> *  df15=spark.sql( some completely other complex stuff with df12)*
>>>> *  df15.write *
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> THanks!
>>>>
>>>>
>>>>
>>>> On 31 Mar 2022, at 14:37, Sean Owen <sro...@gmail.com> wrote:
>>>>
>>>> If that is your loop unrolled, then you are not doing parts of work at
>>>> a time. That will execute all operations in one go when the write finally
>>>> happens. That's OK, but may be part of the problem. For example if you are
>>>> filtering for a subset, processing, and unioning, then that is just a
>>>> harder and slower way of applying the transformation to all data at once.
>>>>
>>>> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen <
>>>> joris.bil...@bigindustries.be> wrote:
>>>>
>>>>> Thanks for reply :-)
>>>>>
>>>>> I am using pyspark. Basicially my code (simplified is):
>>>>>
>>>>> df=spark.read.csv(hdfs://somehdfslocation)
>>>>> df1=spark.sql (complex statement using df)
>>>>> ...
>>>>> dfx=spark.sql(complex statement using df x-1)
>>>>> ...
>>>>> dfx15.write()
>>>>>
>>>>>
>>>>> What exactly is meant by "closing resources"? Is it just unpersisting
>>>>> cached dataframes at the end and stopping the spark context explicitly:
>>>>> sc.stop()?
>>>>>
>>>>>
>>>>> FOr processing many years at once versus a chunk in a loop: I see that
>>>>> if I go up to certain number of days, one iteration will start to have
>>>>> tasks that fail. So I only take a limited number of days, and do this
>>>>> process several times. Isnt this normal as you are always somehow limited
>>>>> in terms of resources (I have 9 nodes wiht 32GB). Or is it like this that
>>>>> in theory you could process any volume, in case you wait long enough? I
>>>>> guess spark can only break down the tasks up to a certain level (based on
>>>>> the datasets' and the intermediate results’ partitions) and at some moment
>>>>> you hit the limit where your resources are not sufficient anymore to
>>>>> process such one task? Maybe you can tweak it a bit, but in the end you’ll
>>>>> hit a limit?
>>>>>
>>>>>
>>>>>
>>>>> Concretely  following topics would be interesting to find out more
>>>>> about (links):
>>>>> -where to see what you are still consuming after spark job ended if
>>>>> you didnt close resources
>>>>> -memory leaks for pyspark
>>>>> -good article about closing resources (you find tons of snippets on
>>>>> how to start spark context+ config for number/cores/memory of
>>>>> worker/executors etc, but never saw a focus on making sure you clean up —>
>>>>> or is it just stopping the spark context)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 30 Mar 2022, at 21:24, Bjørn Jørgensen <bjornjorgen...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> It`s quite impossible for anyone to answer your question about what is
>>>>> eating your memory, without even knowing what language you are using.
>>>>>
>>>>> If you are using C then it`s always pointers, that's the mem issue.
>>>>> If you are using python, there can be some like not using context
>>>>> manager like With Context Managers and Python's with Statement
>>>>> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Frealpython.com%2Fpython-with-statement%2F&data=04%7C01%7Cjoris.billen%40bigindustries.be%7C1052bbbb08ce41659e5d08da1703fc52%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637847603163284467%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=xs5S5MLsdtZAod%2FotAiT7Ta7ENXcA4N4lEaDKjxHzYg%3D&reserved=0>
>>>>>
>>>>> And another can be not to close resources after use.
>>>>>
>>>>> In my experience you can process 3 years or more of data, IF you are
>>>>> closing opened resources.
>>>>> I use the web GUI http://spark:4040 to follow what spark is doing.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen <
>>>>> joris.bil...@bigindustries.be>:
>>>>>
>>>>>> Thanks for answer-much appreciated! This forum is very useful :-)
>>>>>>
>>>>>> I didnt know the sparkcontext stays alive. I guess this is eating up
>>>>>> memory.  The eviction means that he knows that he should clear some of 
>>>>>> the
>>>>>> old cached memory to be able to store new one. In case anyone has good
>>>>>> articles about memory leaks I would be interested to read.
>>>>>> I will try to add following lines at the end of my job (as I cached
>>>>>> the table in spark sql):
>>>>>>
>>>>>>
>>>>>> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")*
>>>>>> *spark.stop()*
>>>>>>
>>>>>>
>>>>>> Wrt looping: if I want to process 3 years of data, my modest cluster
>>>>>> will never do it one go , I would expect? I have to break it down in
>>>>>> smaller pieces and run that in a loop (1 day is already lots of data).
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 30 Mar 2022, at 17:25, Sean Owen <sro...@gmail.com> wrote:
>>>>>>
>>>>>> The Spark context does not stop when a job does. It stops when you
>>>>>> stop it. There could be many ways mem can leak. Caching maybe - but it 
>>>>>> will
>>>>>> evict. You should be clearing caches when no longer needed.
>>>>>>
>>>>>> I would guess it is something else your program holds on to in its
>>>>>> logic.
>>>>>>
>>>>>> Also consider not looping; there is probably a faster way to do it in
>>>>>> one go.
>>>>>>
>>>>>> On Wed, Mar 30, 2022, 10:16 AM Joris Billen <
>>>>>> joris.bil...@bigindustries.be> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I have a pyspark job submitted through spark-submit that does some
>>>>>>> heavy processing for 1 day of data. It runs with no errors. I have to 
>>>>>>> loop
>>>>>>> over many days, so I run this spark job in a loop. I notice after couple
>>>>>>> executions the memory is increasing on all worker nodes and eventually 
>>>>>>> this
>>>>>>> leads to faillures. My job does some caching, but I understand that when
>>>>>>> the job ends successfully, then the sparkcontext is destroyed and the 
>>>>>>> cache
>>>>>>> should be cleared. However it seems that something keeps on filling the
>>>>>>> memory a bit more and more after each run. THis is the memory behaviour
>>>>>>> over time, which in the end will start leading to failures :
>>>>>>>
>>>>>>> (what we see is: green=physical memory used, green-blue=physical
>>>>>>> memory cached, grey=memory capacity =straight line around 31GB )
>>>>>>> This runs on a healthy spark 2.4 and was optimized already to come
>>>>>>> to a stable job in terms of spark-submit resources parameters like
>>>>>>> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
>>>>>>> Any clue how to “really” clear the memory in between jobs? So
>>>>>>> basically currently I can loop 10x and then need to restart my cluster 
>>>>>>> so
>>>>>>> all memory is cleared completely.
>>>>>>>
>>>>>>>
>>>>>>> Thanks for any info!
>>>>>>>
>>>>>>> <Screenshot 2022-03-30 at 15.28.24.png>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Bjørn Jørgensen
>>>>> Vestre Aspehaug 4, 6010 Ålesund
>>>>> Norge
>>>>>
>>>>> +47 480 94 297
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to