Great, upgrade from 2.4 to 3.X.X It seams like you can use unpersist <https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.unpersist.html> after df=read(fromhdfs) df2=spark.sql(using df 1) ..df10=spark.sql(using df9) ?
I did use kubernetes and spark with S3 API min.io <https://min.io/solutions/hdfs-migration> and it works :) With kubernetes you deploy everything with k8s resource limits <https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/> If you have done it this way, then you won't have this problem. ons. 6. apr. 2022 kl. 19:21 skrev Gourav Sengupta <gourav.sengu...@gmail.com >: > Hi, > super duper. > > Please try to see if you can write out the data to S3, and then write a > load script to load that data from S3 to HBase. > > > Regards, > Gourav Sengupta > > > On Wed, Apr 6, 2022 at 4:39 PM Joris Billen <joris.bil...@bigindustries.be> > wrote: > >> HI, >> thanks for your reply. >> >> >> I believe I have found the issue: the job writes data to hbase which is >> on the same cluster. >> When I keep on processing data and writing with spark to hbase , >> eventually the garbage collection can not keep up anymore for hbase, and >> the hbase memory consumption increases. As the clusters hosts both hbase >> and spark, this leads to an overall increase and at some point you hit the >> limit of the available memory on each worker. >> I dont think the spark memory is increasing over time. >> >> >> >> Here more details: >> >> **Spark: 2.4 >> **operation: many spark sql statements followed by writing data to a >> nosql db from spark >> like this: >> df=read(fromhdfs) >> df2=spark.sql(using df 1) >> ..df10=spark.sql(using df9) >> spark.sql(CACHE TABLE df10) >> df11 =spark.sql(using df10) >> df11.write >> Df12 =spark.sql(using df10) >> df12.write >> df13 =spark.sql(using df10) >> df13.write >> **caching: yes one df that I will use to eventually write 3 x to a db >> (those 3 are different) >> **Loops: since I need to process several years, and processing 1 day is >> already a complex process (40 minutes on 9 node cluster running quite a bit >> of executors). So in the end it will do all at one go and there is a limit >> of how much data I can process in one go with the available resources. >> Some people here pointed out they believe this looping should not be >> necessary. But what is the alternative? >> —> Maybe I can write to disk somewhere in the middle, and read again from >> there so that in the end not all must happen in one go in memory. >> >> >> >> >> >> >> >> On 5 Apr 2022, at 14:58, Gourav Sengupta <gourav.sengu...@gmail.com> >> wrote: >> >> Hi, >> >> can you please give details around: >> spark version, what is the operation that you are running, why in loops, >> and whether you are caching in any data or not, and whether you are >> referencing the variables to create them like in the following expression >> we are referencing x to create x, x = x + 1 >> >> Thanks and Regards, >> Gourav Sengupta >> >> On Mon, Apr 4, 2022 at 10:51 AM Joris Billen < >> joris.bil...@bigindustries.be> wrote: >> >>> Clear-probably not a good idea. >>> >>> But a previous comment said “you are doing everything in the end in one >>> go”. >>> So this made me wonder: in case your only action is a write in the end >>> after lots of complex transformations, then what is the alternative for >>> writing in the end which means doing everything all at once in the end? My >>> understanding is that if there is no need for an action earlier, you will >>> do all at the end, which means there is a limitation to how many days you >>> can process at once. And hence the solution is to loop over a couple days, >>> and submit always the same spark job just for other input. >>> >>> >>> Thanks! >>> >>> On 1 Apr 2022, at 15:26, Sean Owen <sro...@gmail.com> wrote: >>> >>> This feels like premature optimization, and not clear it's optimizing, >>> but maybe. >>> Caching things that are used once is worse than not caching. It looks >>> like a straight-line through to the write, so I doubt caching helps >>> anything here. >>> >>> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen < >>> joris.bil...@bigindustries.be> wrote: >>> >>>> Hi, >>>> as said thanks for little discussion over mail. >>>> I understand that the action is triggered in the end at the write and >>>> then all of a sudden everything is executed at once. But I dont really need >>>> to trigger an action before. I am caching somewherew a df that will be >>>> reused several times (slightly updated pseudocode below). >>>> >>>> Question: is it then better practice to already trigger some actions on >>>> intermediate data frame (like df4 and df8), and cache them? So that these >>>> actions will not be that expensive yet, and the actions to write at the end >>>> will require less resources, which would allow to process more days in one >>>> go? LIke what is added in red in improvement section in the pseudo >>>> code below? >>>> >>>> >>>> >>>> *pseudocode:* >>>> >>>> >>>> *loop over all days:* >>>> * spark submit 1 day* >>>> >>>> >>>> >>>> with spark submit (overly simplified)= >>>> >>>> >>>> * df=spark.read(hfs://somepath)* >>>> * …* >>>> * ##IMPROVEMENT START* >>>> * df4=spark.sql(some stuff with df3)* >>>> * spark.sql(CACHE TABLE df4)* >>>> * …* >>>> * df8=spark.sql(some stuff with df7)* >>>> * spark.sql(CACHE TABLE df8)* >>>> * ##IMPROVEMENT END* >>>> * ...* >>>> * df12=df11.spark.sql(complex stufff)* >>>> * spark.sql(CACHE TABLE df10)* >>>> * ...* >>>> * df13=spark.sql( complex stuff with df12)* >>>> * df13.write * >>>> * df14=spark.sql( some other complex stuff with df12)* >>>> * df14.write * >>>> * df15=spark.sql( some completely other complex stuff with df12)* >>>> * df15.write * >>>> >>>> >>>> >>>> >>>> >>>> >>>> THanks! >>>> >>>> >>>> >>>> On 31 Mar 2022, at 14:37, Sean Owen <sro...@gmail.com> wrote: >>>> >>>> If that is your loop unrolled, then you are not doing parts of work at >>>> a time. That will execute all operations in one go when the write finally >>>> happens. That's OK, but may be part of the problem. For example if you are >>>> filtering for a subset, processing, and unioning, then that is just a >>>> harder and slower way of applying the transformation to all data at once. >>>> >>>> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen < >>>> joris.bil...@bigindustries.be> wrote: >>>> >>>>> Thanks for reply :-) >>>>> >>>>> I am using pyspark. Basicially my code (simplified is): >>>>> >>>>> df=spark.read.csv(hdfs://somehdfslocation) >>>>> df1=spark.sql (complex statement using df) >>>>> ... >>>>> dfx=spark.sql(complex statement using df x-1) >>>>> ... >>>>> dfx15.write() >>>>> >>>>> >>>>> What exactly is meant by "closing resources"? Is it just unpersisting >>>>> cached dataframes at the end and stopping the spark context explicitly: >>>>> sc.stop()? >>>>> >>>>> >>>>> FOr processing many years at once versus a chunk in a loop: I see that >>>>> if I go up to certain number of days, one iteration will start to have >>>>> tasks that fail. So I only take a limited number of days, and do this >>>>> process several times. Isnt this normal as you are always somehow limited >>>>> in terms of resources (I have 9 nodes wiht 32GB). Or is it like this that >>>>> in theory you could process any volume, in case you wait long enough? I >>>>> guess spark can only break down the tasks up to a certain level (based on >>>>> the datasets' and the intermediate results’ partitions) and at some moment >>>>> you hit the limit where your resources are not sufficient anymore to >>>>> process such one task? Maybe you can tweak it a bit, but in the end you’ll >>>>> hit a limit? >>>>> >>>>> >>>>> >>>>> Concretely following topics would be interesting to find out more >>>>> about (links): >>>>> -where to see what you are still consuming after spark job ended if >>>>> you didnt close resources >>>>> -memory leaks for pyspark >>>>> -good article about closing resources (you find tons of snippets on >>>>> how to start spark context+ config for number/cores/memory of >>>>> worker/executors etc, but never saw a focus on making sure you clean up —> >>>>> or is it just stopping the spark context) >>>>> >>>>> >>>>> >>>>> >>>>> On 30 Mar 2022, at 21:24, Bjørn Jørgensen <bjornjorgen...@gmail.com> >>>>> wrote: >>>>> >>>>> It`s quite impossible for anyone to answer your question about what is >>>>> eating your memory, without even knowing what language you are using. >>>>> >>>>> If you are using C then it`s always pointers, that's the mem issue. >>>>> If you are using python, there can be some like not using context >>>>> manager like With Context Managers and Python's with Statement >>>>> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Frealpython.com%2Fpython-with-statement%2F&data=04%7C01%7Cjoris.billen%40bigindustries.be%7C1052bbbb08ce41659e5d08da1703fc52%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637847603163284467%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=xs5S5MLsdtZAod%2FotAiT7Ta7ENXcA4N4lEaDKjxHzYg%3D&reserved=0> >>>>> >>>>> And another can be not to close resources after use. >>>>> >>>>> In my experience you can process 3 years or more of data, IF you are >>>>> closing opened resources. >>>>> I use the web GUI http://spark:4040 to follow what spark is doing. >>>>> >>>>> >>>>> >>>>> >>>>> ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen < >>>>> joris.bil...@bigindustries.be>: >>>>> >>>>>> Thanks for answer-much appreciated! This forum is very useful :-) >>>>>> >>>>>> I didnt know the sparkcontext stays alive. I guess this is eating up >>>>>> memory. The eviction means that he knows that he should clear some of >>>>>> the >>>>>> old cached memory to be able to store new one. In case anyone has good >>>>>> articles about memory leaks I would be interested to read. >>>>>> I will try to add following lines at the end of my job (as I cached >>>>>> the table in spark sql): >>>>>> >>>>>> >>>>>> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")* >>>>>> *spark.stop()* >>>>>> >>>>>> >>>>>> Wrt looping: if I want to process 3 years of data, my modest cluster >>>>>> will never do it one go , I would expect? I have to break it down in >>>>>> smaller pieces and run that in a loop (1 day is already lots of data). >>>>>> >>>>>> >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On 30 Mar 2022, at 17:25, Sean Owen <sro...@gmail.com> wrote: >>>>>> >>>>>> The Spark context does not stop when a job does. It stops when you >>>>>> stop it. There could be many ways mem can leak. Caching maybe - but it >>>>>> will >>>>>> evict. You should be clearing caches when no longer needed. >>>>>> >>>>>> I would guess it is something else your program holds on to in its >>>>>> logic. >>>>>> >>>>>> Also consider not looping; there is probably a faster way to do it in >>>>>> one go. >>>>>> >>>>>> On Wed, Mar 30, 2022, 10:16 AM Joris Billen < >>>>>> joris.bil...@bigindustries.be> wrote: >>>>>> >>>>>>> Hi, >>>>>>> I have a pyspark job submitted through spark-submit that does some >>>>>>> heavy processing for 1 day of data. It runs with no errors. I have to >>>>>>> loop >>>>>>> over many days, so I run this spark job in a loop. I notice after couple >>>>>>> executions the memory is increasing on all worker nodes and eventually >>>>>>> this >>>>>>> leads to faillures. My job does some caching, but I understand that when >>>>>>> the job ends successfully, then the sparkcontext is destroyed and the >>>>>>> cache >>>>>>> should be cleared. However it seems that something keeps on filling the >>>>>>> memory a bit more and more after each run. THis is the memory behaviour >>>>>>> over time, which in the end will start leading to failures : >>>>>>> >>>>>>> (what we see is: green=physical memory used, green-blue=physical >>>>>>> memory cached, grey=memory capacity =straight line around 31GB ) >>>>>>> This runs on a healthy spark 2.4 and was optimized already to come >>>>>>> to a stable job in terms of spark-submit resources parameters like >>>>>>> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait). >>>>>>> Any clue how to “really” clear the memory in between jobs? So >>>>>>> basically currently I can loop 10x and then need to restart my cluster >>>>>>> so >>>>>>> all memory is cleared completely. >>>>>>> >>>>>>> >>>>>>> Thanks for any info! >>>>>>> >>>>>>> <Screenshot 2022-03-30 at 15.28.24.png> >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Bjørn Jørgensen >>>>> Vestre Aspehaug 4, 6010 Ålesund >>>>> Norge >>>>> >>>>> +47 480 94 297 >>>>> >>>>> >>>>> >>>> >>> >> -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297