Ack, I think that makes sense. Let's get some of the work done and then maybe we can try getting the data as well.
-- Regards, Aritra Basu On Thu, 18 Dec 2025, 6:47 pm Jarek Potiuk, <[email protected]> wrote: > > Jarek do you think we should try to get some telemetry on the size/count > of > imports that are loaded, the amount of reuse/overlap we have, etc? > > Yes, but not necessarily telemetry and not necessarily to start with it. I > think initially we need more like a good set of test dags with various > imports that might simulate various scenarios. This will be way easier to > pull off - and this is what Jeongwoo already did when analysing the COW > with Local Executor. Then yes - we could gather some > "telemetry" information (not automatically but we could ask our users to do > some experimenting on their dags - with the new ray feature, also courtesy > of Jeongwoo). But asking for it first will not let us experiment with > observed gains when we experiment with fixes. It could be useful later - to > ask our "active" users to run some experiments when we enable any > improvements we come up with (undoubtedly those should be opt-in features > initially) - but I would say we need to have some good understanding of > what we can gain by trying it out locally first. > > > Think one off dags importing something ginormous? And additionally > it seems like a non trivial change so some data to validate our thoughts > would put my mind at ease. > > I think more of a compound effect of importing the same thing by multiple > processes. If you look at the local executor fixes achieved by > https://github.com/apache/airflow/pull/58365 - by avoiding the COWs, the > memory used "per-executor-process" are enormous: 114 M per-process vs 39 M > per-process is HUGE. Basically it means that in the tests done by Jeongwoo > - if you have a local executor with 20 processes, you save 20*70M => 1.4 GB > of memory - simply because most of the memory remains shared between all > those processes instead of being essentially copied. Not mentioning the > performance of all those memory copy operations incurred by COW. > > This will be similar for Celery Executor. Currently - because we allow the > workers to run with - even high - concurrency, what is happening in celery > workers is that those preforked processes that are long-running are > essentially keeping a copy of almost exactly the same memory resulting from > importing some heavy libraries. Note - that it's not just **imported > modules** as such - but ALL the memory those imported modules are > allocating on heap when they are initialized. Many, many python modules > allocate memory for something - classes, they import, pre-loading some > data, preparing some in-memory structure so that you avoid doing I/O > operations and use in-memory structure to speed up lookups. Essentially - a > lot of memory is read-only when initialized, but the way how garbage > collection works, and the fact that such memory is usually fragmented, the > issue is that garbage collection will essentially - over time - copy all > that read-only memory in forked processes (unless you do this > gc.freeze/unfreeze dance with prevents that) . Essentially - in the local > executor tests done by Jeongwoo - 70 M out of 114 M used by a forked > process is this kind of memory - shared memory that is read-only that is > copied because of garbage collector. -> that's almost 65% saving per > process. We have 16 as default concurrency in celery now and "prefork" is > the default setting - and we have "long" running forks - we do not have > "max_tasks_per_child" set, nor any other "recycling" of celery worker > processes, so we can safely assume that in vast majority of our > installations - all the workers have essentially 16x copy of the memory > that could be shared because gc process would not COW it. This is a HUGE > saving if we can decrease default memory usage of pretty much all > installations there. Assuming that it's even just 50% of memory, this means > that those workers currently use 8x more memory than they need. Not 20 or > 30 percent more, but 8x more (!) - which is 800%. > > So the memory gains should be **huge** for celery executor IMHO. > > The memory gains for Dag File Processor will not be as big - because there > are no long-running processes and we have usually smaller concurrency (2 is > the default but of course in huge installations this goes up) - there will > only be spikes of memory, not sustained usage, there - the main gain is > performance, and actually this is quite important once, because same dags > are continuously parsed and re-parsed, and if the approach with import > optimisation will work as hypothesised, this means that all the import > savings will be multiplied by a number of time dags are parsed. Even more - > the same imports used in different dags will also be optimized away. > Imagine you have 1000 dags parsed every 5 minutes (safe assumption - > normally you want parsing to be done more frequently) and you save just 1s > for the import (which is not something unlikely - pandas import can easily > takes seconds), then you save 200s build-time seconds (2.5 minutes) per 5 > minutes of parsing. That's a lot of CPU time saved. Like... A LOT. > > Those are the "back-of-the-envelope" calculations I can hypothesise about, > but IMHO - they are quite close to real gains we can observe. So ... if we > think "is it worth trying", my answer is - definitely yes. But we should > see some real numbers and results of those experiments, before we actually > release it to the users, and this could be initially opt-in, and we could > ask some friendly users to test it out in "reality" and bring back the real > numbers and stability feedback, and then we could decide to make it > "opt-out". > > J. > > > > On Thu, Dec 18, 2025 at 1:25 PM Aritra Basu <[email protected]> > wrote: > > > Jarek do you think we should try to get some telemetry on the size/count > of > > imports that are loaded, the amount of reuse/overlap we have, etc? > Because > > while this sounds good and might help, I fear we might in this aim to > > reduce our memory footprint we might be overloading (no pun intended) the > > process? Think one off dags importing something ginormous? And > additionally > > it seems like a non trivial change so some data to validate our thoughts > > would put my mind at ease. > > > > This is all based on the assumption that what I understood from the > thread > > that we try to preload imports and freeze them in memory to reduce > > subsequent loads is correct? > > > > -- > > Regards, > > Aritra Basu > > > > On Thu, 18 Dec 2025, 5:24 pm Jarek Potiuk, <[email protected]> wrote: > > > > > I am very much pro exploring several options there - and I think there > > are > > > several approaches that should be tried here. Increased memory usage in > > > Airflow 3 is a fact that is indisputable. I think we do not yet > > understand > > > all the reasons for that. The COW case that I initially hypothesized > > about > > > (after listening to a few Core.py podcasts where I understood better > how > > > memory management in Python plays with forking) was very > > counter-intuitive. > > > The fact that in our scenario, garbage collecting can **significantly > > > increase** memory usage (even many times in your experiments) was > > > absolutely not obvious. But your experiments, and finding out that as > of > > > Python 3.7 we even have a mechanism to handle it (gc.freeze) had shown > > > clearly that this is it > > > > > > We have seen that in a number of user reports and I think we should > have > > a > > > deliberate effort to find those cases and optimize them. And I think > some > > > of our changes with isolation and separation of our code had triggered > > some > > > of the memory usage growth, as side-effects - because the impact of > > memory > > > usage in interpreter languages like Python is complex - especially when > > we > > > are using forking as a primary mechanism to implement isolation between > > > spawned processes. Simply speaking (and for me this was really > discovery > > i > > > made after listening to those "core.py" podcast by core python > > developers - > > > Łukasz Langa and Pablo Galindo - especially all the chapters related to > > > reference counting, garbage collection, free threading (that for the > > > future) - it became clear that Python and Forking are not playing very > > well > > > together out-of-the-box and you need to do quite a deliberate effort to > > > optimise memory usage when you heavily use forking. > > > > > > But I think the important part here is that we need to do it > step-by-step > > > and experiment with some approaches and see the actual impact it might > > have > > > - like with the local executor case. * > > > > > > I think there are two aspects of it: > > > > > > * Optimizing "airflow-induced" imports > > > > > > This has been a long standing issue that we were painfully aware of and > > > we've been discussing fixing it for years. In Airflow 2 I made at > least 2 > > > attempts - unsuccessful - to untangle some of the spaghetti code and > > chains > > > of imports that we had - this was impossible to do without more drastic > > > moves. And with Airflow 3 we are on a good path to fix those. I > explained > > > in > https://github.com/apache/airflow/pull/58890#issuecomment-3599373034 > > - > > > some of the memory usage effects we observe now will be essentially > > > **gone** once we complete separation of "task sdk" from "airflow" - > and > > I > > > would say a lot of this experiments should be performed once we > complete > > > it, because it will dramatically change the internal import behaviour > and > > > import chains we observe today will be much shorter, predictable and > less > > > memory consuming. Also likely we can make some better decisions on when > > and > > > how to freeze gc for forking. > > > > > > So .. I would simply hold our horses before we complete that part. > > > > > > * Optimizing "external imports" > > > > > > This is much more doable **now**. And personally I do not even see a > big > > > issue with pre-importing external packages and doing the gc.freeze() > > dance > > > in Dag File Processor and Celery Workers. And even reverting what you > > > proposed: importing all the used packages "by default" unless the user > > > opts-out. I was thinking a lot about how we can do it and I think I > have > > a > > > general "high level" proposal on how we can do it. > > > > > > Few assumptions / observations: > > > > > > * when we are importing from 'installed distribution" (i.e. not from > dags > > > folder), generally there is **no expectation** that you should be able > to > > > reload that module. Running a Python interpreter by default will import > > > such packages only once and we all know that attempting to reload such > > > modules in general is futile - you need to basically restart the > > > interpreter to load a new version of such a module. Hot reloading is at > > > most developer tooling (very useful, but not practical for production). > > So > > > I am not too worried about pre-importing those external modules (as > long > > as > > > we know which modules we **should** import). I think KNOWING what to > > import > > > before the fork is the most important problem to solve. > > > * for Dag Processor - those forks where memory grows only temporarily > so > > > the memory growth is at most spiking, not lasting too long - so I am > less > > > worried about memory but more about performance. And there, I would not > > > worry too much about OOMs > > > * for Celery - where "prefork" mode is used by default - this is quite > a > > > different story, there we can see high memory usage across multiple > > > pre-forked processes, either coming from COW effect or coming from the > > fact > > > that we are importing the same huge imports in parallel in many > processes > > > > > > So I (this is again a bit of hypothesising) that we should likely use a > > bit > > > different strategy for DagProcessors and different for Celery workers. > > > > > > My ideas: > > > > > > a) for DagProcessors, I think it would be perfectly fine if we just > track > > > what's being imported in a forked parser. Send the list of modules > > imported > > > from real installed "distributions" (easy to check) to Dag Processor > > > Manager, and let it import those in its interpreter. Then we can do the > > > freeze/unfreeze dance before every fork (i.e. before we attempt to > parse > > > another Dag file). This might have a set of nice properties: > > > * we do not have to know what to import in advance - so we do not need > to > > > parse all the Dag files upfront > > > * the first time a module is imported in a Dag, it will be reported > back > > to > > > the Dag Processor Manager, it will import it, and if we do the > > > freeze/unfreeze dance - next time when parsing happens, the import will > > be > > > already in memory and not garbage-collectible - so we will not incur > COW > > > needlessly and we will reuse all that memory from DagFileProcessor > > > * the effect of it -> initially DagProcessorManager memory will be > > smaller, > > > each parsing will get a spike of memory loaded for import it needs, > then > > > when it completes, the memory is freed, but DagProcessorManager will > > import > > > the thing, grow it's memory and this will decrease memory and speed up > > > parsing of all subsequent Dags importing the same modules > > > * which has a nice smoothening effect, while initially there will be > some > > > spikes, over time when more and more Dags are parsed and modules > > imported, > > > this will smoothen out and for common imports, the memory used will > > > **only** be allocated in DagFileProcessor Manager, - the memory will be > > > shared by the parsing processes - and performance will improve > > > significantly > > > > > > b) for Celery workers, I think it would make sense to do something a > bit > > > more sophisticated but similar. The problem is that those pre-forked > > > processes might be (and are by default) long running - they are "pre" > > > forked. And there, we do not know (before we parse all the files) which > > > modules are actually needed to be pre-imported before we fork them, > This > > is > > > a tough one to solve in generic case - because celery workers generally > > do > > > not parse "all" the Dag files (unlike Dag File Processors) - they > > generally > > > only should parse the particular Dags they execute (which for example > in > > > case of queues, might be a small subset of Dags). In pre-fork mode, > > Celery > > > will spawn `concurrency` child processes > > > https://docs.celeryq.dev/en/latest/userguide/workers.html#concurrency > to > > > run the tasks. Here I think we can use similar strategy as in case of > > > DagProcessors - i.e. track all imports and get them imported by the > > > "parent" celery worker, so that next fork is using that, and we could > do > > > the same freeze/unfreeze dance while forking (I think Celery has hooks > to > > > allow that) - but only under one condition - that we restart the forked > > > processes from time to time. Which.... is actually possible for example > > > with > > > > > > > > > https://docs.celeryq.dev/en/latest/userguide/configuration.html#std-setting-worker_max_tasks_per_child > > > - which will restart the process after processing x tasks. So far we > have > > > not set that config as default (in our docs we explain that you can do > it > > > with custom configuration) - but if we do this "import gc dance" - this > > is > > > pretty much a prerequisite to make use of such pre-imported modules > that > > we > > > will dynamically import in the "parent" process - otherwise all the > forks > > > will just run forever and each of them will have a copy of the memory > > used > > > by imports in their process. > > > > > > Both a) and b) have this nice property that in case of any issues or > new > > > versions of distributions installed, restart of those will start the > > whole > > > thing from the beginning - the imports will be gradually loaded and the > > > forks will gradually make use of those - pre-imports done by "parent" > > > process (until the next restart) > > > > > > My hypothesis is that if we do that - we can achieve all three goals: > > > decrease the memory usage overall, decrease memory spikes over time, > and > > > increase performance. > > > > > > But all that needs to be tested of course. > > > > > > J. > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Dec 18, 2025 at 10:30 AM Aritra Basu <[email protected] > > > > > wrote: > > > > > > > This is quite an interesting read Jeongwoo, I don't yet have a strong > > > > opinion on this except it's worth checking out. I'll reread through a > > > > couple times and hopefully come up with some thoughts, but your > > > > investigation so far looks quite interesting. > > > > > > > > -- > > > > Regards, > > > > Aritra Basu > > > > > > > > On Thu, 18 Dec 2025, 2:08 pm Jeongwoo Do, <[email protected]> > > wrote: > > > > > > > > > Hello Airflow community, > > > > > > > > > > While working on resolving a memory leak issue in the > > > LocalExecutor[1], I > > > > > observed that garbage collection (GC) in forked subprocesses was > > > > triggering > > > > > Copy-On-Write (COW) on shared memory, which significantly increased > > > each > > > > > process’s PSS. By using gc.freeze to move objects created at > > subprocess > > > > > startup into the GC permanent generation, I was able to mitigate > this > > > > issue > > > > > effectively. > > > > > > > > > > > > > > > > > > > > I would like to propose applying the same approach to the Dag > > processor > > > > to > > > > > address GC-related performance issues and improve stability in > > > > > subprocesses. Below are the expected benefits. > > > > > > > > > > Preventing COW on Shared Memory > > > > > Unlike the LocalExecutor, where subprocesses are long-lived, Dag > > > > processor > > > > > subprocesses are not permanent. However, with the increasing > adoption > > > of > > > > > dynamic Dags, parsing time has become longer in many cases. GC > > activity > > > > > during parsing can trigger COW on shared memory, leading to memory > > > > spikes. > > > > > In containerized environments, these spikes can result in OOM > events. > > > > > > > > > > Improving GC Performance > > > > > Applying gc.freeze marks existing objects as non-GC targets. As a > > > result, > > > > > this greatly lowers the frequency of threshold-based GC runs and > > makes > > > GC > > > > > much faster when it does occur. In a simple experiment, I observed > GC > > > > time > > > > > dropping from roughly 1 second to about 1 microsecond (with GC > forced > > > via > > > > > gc.collect). > > > > > > > > > > Eliminating GC-Related Issues in Child Processes > > > > > Similar to the issue in [2], GC triggered arbitrarily in child > > > processes > > > > > can affect shared objects inherited from the parent. By ensuring > that > > > > > parent-owned objects are not subject to GC in children, these > issues > > > can > > > > be > > > > > avoided entirely. > > > > > > > > > > > > > > > > > > > > Beyond immediate performance and stability improvements, increased > > > memory > > > > > stability also enables further optimizations. For example, > preloading > > > > heavy > > > > > modules in the parent process can eliminate repeated memory loading > > in > > > > each > > > > > child process. This approach has been discussed previously in [3], > > and > > > > > preloading Airflow modules is already partially implemented today. > > > > > > > > > > While [3] primarily focused on parsing time, the broader benefit is > > > > > reduced CPU and memory usage overall. Extending this idea beyond > > > Airflow > > > > > modules, allowing users to pre-import libraries used in DAG files > > could > > > > > provide significant performance gains. > > > > > > > > > > That said, it is also clear why this has not been broadly adopted > so > > > far. > > > > > Persistently importing problematic libraries defined in DAG files > > could > > > > > introduce side effects, and unloading modules once loaded is > > difficult. > > > > In > > > > > environments with frequent DAG changes, this can become a burden. > > > > > > > > > > For this reason, I believe the optimal approach is to allow > > > pre-importing > > > > > only for explicitly user-approved libraries. Users would define > which > > > > > libraries to preload via configuration. These libraries would be > > loaded > > > > > lazily, and only after they are successfully loaded in a child > > process > > > > > would they be loaded in the parent process as well. The pre-import > > > > > mechanism I proposed recently in [4] may be helpful here. > > > > > > > > > > > > > > > > > > > > In summary, I am proposing two items: > > > > > > > > > > 1. Apply gc.freeze to the DAG processor. > > > > > > > > > > 2.Then, allow user-aware and intentional preloading of libraries. > > > > > > > > > > Thank you for taking the time to read this. If this proposal > requires > > > an > > > > > AIP, I would be happy to prepare one. > > > > > > > > > > [1] https://github.com/apache/airflow/pull/58365 > > > > > [2] https://github.com/apache/airflow/issues/56879 > > > > > [3] https://github.com/apache/airflow/pull/30495 > > > > > [4] https://github.com/apache/airflow/pull/58890 > > > > > > > > > > Best Regards, Jeongwoo Do > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
