Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-19 Thread via GitHub


Lzzz666 commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2986907981

   Fixed CI problems!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-18 Thread via GitHub


Lee-W commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2986430283

   Looks good! @Lee-W could you please take a look at the CI failure? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-17 Thread via GitHub


Lzzz666 commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2979352249

   I ran a benchmark comparing three loading strategies:
   
   1. Pre-import before parsing loop
   Call `_pre_import_airflow_modules()` once before entering the while loop in 
_run_parsing_loop().
   2. Pre-import before fork
   Call it right before fork process
   3. No pre-import
   
   **Setup**
   
   * 1,000 parse iterations per experiment
   * Loaded almost all core Airflow modules plus numpy, pandas, celery, and k8s 
(no providers, api, www, cli)
   * Averaged 1,000 parse (first run excluded)
   * Measured
   ```python
   process_creation_time = time.monotonic() - process_start_time
   ```
   immediately after `self._start_new_processes()`
   
   **Results**
   
   “Before parsing loop” vs “before fork”
   
   * Moving the pre-import outside the parsing loop reduced 
`process_creation_time` by **92.8%** compared to doing it before per-fork.
   
   Baseline vs “before parsing loop”
   
   * Without pre-import at all was about 29% faster than pre-importing before 
the loop, but the actual time saved was tiny—probably just normal fork timing 
noise.
   
   
![image](https://github.com/user-attachments/assets/f02a4f07-9eb6-4f38-a932-245c2e7df23e)
   
   **Conclusion**
   
   If my experimental setup is correct, and pre-importing before the parse loop 
doesn’t introduce any unintended side effects, it might offer an opportunity to 
improve efficiency.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-16 Thread via GitHub


Lee-W commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2978686419

   > I’m not entirely sure, but if we want to pre-load all the core Airflow 
modules—instead of, as before, pre-loading only the modules required by the 
current dag file before forking—should we place the pre-import function before 
the parse loop? This way, we can avoid redundant imports.
   
   Probably will need to benchmark again to see whether we want it. I feel it 
probably wouldn't affect much? Not sure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-16 Thread via GitHub


Lzzz666 commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2977085285

   Correct me if I’m wrong, but if we want to pre-load all the core Airflow 
modules, we should place the pre-import function before the parse loop. this 
way, we can avoid redundant imports.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-16 Thread via GitHub


jason810496 commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2150048695


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -253,6 +281,15 @@ def start(  # type: ignore[override]
 client: Client,
 **kwargs,
 ) -> Self:
+# Pre-import Airflow modules in parent process to save CPU time and 
memory
+# This prevents them from being re-imported from zero in each 
processing subprocess
+import structlog
+
+log = structlog.get_logger(logger_name="dag_processor")

Review Comment:
   My mistake, before move to the line before `proc: Self = super().start()` we 
can get `log` from the argument of the function. After moving there, IMO, it's 
fine to use `structlog.get_logger` to retrieve the logger, and will resolve 
most of the test errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-16 Thread via GitHub


jason810496 commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2150016679


##
airflow-core/src/airflow/configuration.py:
##
@@ -347,6 +347,7 @@ def sensitive_config_values(self) -> set[tuple[str, str]]:
 # DeprecationWarning will be issued and the old option will be used instead
 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
 ("dag_processor", "refresh_interval"): ("scheduler", 
"dag_dir_list_interval", "3.0"),
+("dag_processor", "parsing_pre_import_modules"): ("scheduler", 
"parsing_pre_import_modules", "3.1.0"),

Review Comment:
   nit:
   Should we move the row to the bottom ?
   Since there are ordered by change version currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-16 Thread via GitHub


jason810496 commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2150048695


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -253,6 +281,15 @@ def start(  # type: ignore[override]
 client: Client,
 **kwargs,
 ) -> Self:
+# Pre-import Airflow modules in parent process to save CPU time and 
memory
+# This prevents them from being re-imported from zero in each 
processing subprocess
+import structlog
+
+log = structlog.get_logger(logger_name="dag_processor")

Review Comment:
   My mistake, before moving `_pre_import_airflow_modules` call to the line 
before `proc: Self = super().start()` ,we can get `log` from the argument of 
the function. After moving there, IMO, it's fine to use `structlog.get_logger` 
to retrieve the logger, and will resolve most of the test errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-16 Thread via GitHub


ephraimbuddy commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2149589600


##
airflow-core/src/airflow/configuration.py:
##
@@ -347,6 +347,7 @@ def sensitive_config_values(self) -> set[tuple[str, str]]:
 # DeprecationWarning will be issued and the old option will be used instead
 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
 ("dag_processor", "refresh_interval"): ("scheduler", 
"dag_dir_list_interval", "3.0"),
+("dag_processor", "parsing_pre_import_modules"): ("scheduler", 
"parsing_pre_import_modules", "3.0.2"),

Review Comment:
   ```suggestion
   ("dag_processor", "parsing_pre_import_modules"): ("scheduler", 
"parsing_pre_import_modules", "3.1.0"),
   ```
   Let's plan this for 3.1.0 since it's not a bug fix but an improvement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-16 Thread via GitHub


ephraimbuddy commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2149257046


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -2485,3 +2485,13 @@ dag_processor:
   type: integer
   example: ~
   default: "10"
+parsing_pre_import_modules:

Review Comment:
   You can delete the scheduler section of this and update this list: 
https://github.com/apache/airflow/blob/3222862aafd41936f9152f770851e09ca77c7e3f/airflow-core/src/airflow/configuration.py#L348



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-15 Thread via GitHub


Lzzz666 commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2975058197

   I found that with the current pre_import implementation (#30495), the 
performance gains are negligible—probably because it only pre-imports the 
Airflow modules actually used in each DAG. In theory this should still speed 
things up, but my benchmarks didn’t show any improvement. However, when I 
modified the pre_import function to preload only the “heavier” third-party 
libraries (NumPy, pandas, the Kubernetes, and Celery), the speed-up became very 
noticeable.
   
   All of my tests involved parsing the same DAG ten times and measuring 
`run_duration` (which is defined as `run_duration = time.monotonic() - 
proc.start_time`).
   
   1. First test with origin  pre-imports method
   
![image](https://github.com/user-attachments/assets/42aa4256-58ec-4304-878d-06f61ccf0fee)
   
   2. Second test with origin  pre-imports method
   
![image](https://github.com/user-attachments/assets/60935221-9fdc-4948-993c-d5c5c4a75b24)
   
   3. Modify pre-import to only include  NumPy, pandas, Kubernetes, celery
   
![image](https://github.com/user-attachments/assets/d2cd44a8-0601-41b4-ba29-2daab6ac96ce)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-15 Thread via GitHub


jason810496 commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2148335724


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -253,6 +281,15 @@ def start(  # type: ignore[override]
 client: Client,
 **kwargs,
 ) -> Self:
+# Pre-import Airflow modules in parent process to save CPU time and 
memory
+# This prevents them from being re-imported from zero in each 
processing subprocess
+import structlog
+
+log = structlog.get_logger(logger_name="dag_processor")

Review Comment:
   I think we can directly use `log` passed by argument directly instead of 
getting logger again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-15 Thread via GitHub


jason810496 commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2148335724


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -253,6 +281,15 @@ def start(  # type: ignore[override]
 client: Client,
 **kwargs,
 ) -> Self:
+# Pre-import Airflow modules in parent process to save CPU time and 
memory
+# This prevents them from being re-imported from zero in each 
processing subprocess
+import structlog
+
+log = structlog.get_logger(logger_name="dag_processor")

Review Comment:
   I think we can directly use `log` pass by argument directly instead of 
getting logger again.



##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -99,6 +101,31 @@ class DagFileParsingResult(BaseModel):
 ]
 
 
+def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> 
None:
+"""
+Pre-import Airflow modules found in the given file.
+
+This prevents modules from being re-imported in each processing process,
+saving CPU time and memory.
+
+Args:
+file_path: Path to the file to scan for imports
+log: Logger instance to use for warnings
+
+parsing_pre_import_modules:
+ default value is True
+"""
+if not conf.getboolean("dag_processor", "parsing_pre_import_modules", 
fallback=True):
+return
+
+log.info(f"[Debug by lz] dag_processor.parsing_pre_import_modules is 
{conf.getboolean('dag_processor', 'parsing_pre_import_modules', 
fallback=True)}")   

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-13 Thread via GitHub


Lzzz666 commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2971286687

   I’ll refer to the experiment in #30495 and provide benchmark results.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-13 Thread via GitHub


ephraimbuddy commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2969785085

   > Correct me if I’m wrong, but when I traced the code, it appears that 
`proc: Self = super().start()` fork parent process to create the child process. 
If the goal of the pre-import is to load modules in the parent before forking, 
it might make more sense to place it before the `proc: Self = super().start`.
   > 
   > ```js
   > DagFileProcessorManager.run()
   > └─ run_parsing_loop()
   >└─ while loop:
   >   └─ start_new_processes()
   > └─ create_process(dag_path) 
   >└─ DagFileProcessorProcess.start()
   >└─ proc: Self = super().start(target=target, 
client=client, **kwargs) //fork
   > ```
   
   Your trace is right but I feel, we should have a verifiable reason to 
implement the pre-import. If you can spend some time and showcase the issue, I 
believe it'll help all of us in finding the reason to re-implement this 
pre-import


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-13 Thread via GitHub


Lzzz666 commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2969437961

   Correct me if I’m wrong, but when I traced the code, it appears that `proc: 
Self = super().start()`  fork parent process to create the child process. If 
the goal of the pre-import is to load modules in the parent before forking, it 
might make more sense to place it before the  `proc: Self = super().start`.
   
   ```javascript
   DagFileProcessorManager.run()
   └─ run_parsing_loop()
  └─ while loop:
 └─ start_new_processes()
   └─ create_process(dag_path) 
  └─ DagFileProcessorProcess.start()
  └─ proc: Self = super().start(target=target, 
client=client, **kwargs) //fork
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-12 Thread via GitHub


Lzzz666 commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2968908245

   Hi @ephraimbuddy, I’m not familiar with the scheduler, but based on the [Dag 
File 
Processing](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/dagfile-processing.html)
 documentation, I think the pre-import function is executed in the 
DagFileProcessorProcess stage, which runs in the parent process before forking. 
So I tried adding pre-import to the parse_file(), I now realize that’s not the 
right place.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-11 Thread via GitHub


ephraimbuddy commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2963883416

   > > @Lzzz666 would you be able to take a look at the review comments on this 
PR?
   > 
   > @amoghrajesh I might need some help or discussion regarding the location 
of the pre-import!
   
   
   
   > > @Lzzz666 would you be able to take a look at the review comments on this 
PR?
   > 
   > @amoghrajesh I might need some help or discussion regarding the location 
of the pre-import!
   
   Could you clarify what you mean by the “location of the pre-import”? 
Implementing pre-import in Airflow 2 doesn’t automatically require us to carry 
it over to Airflow 3. Airflow 3 uses a dedicated standalone DAG processor, 
whereas Airflow 2 still straddled scheduler-based and standalone dag parsing. 
Before moving forward, can you point out the specific issue you’re seeing or 
error messages?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-10 Thread via GitHub


Lee-W commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2137327413


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -94,8 +96,35 @@ def _parse_file_entrypoint():
 comms_decoder.send_request(log, result)
 
 
+def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> 
None:
+"""
+Pre-import Airflow modules found in the given file.
+
+This prevents modules from being re-imported in each processing process,
+saving CPU time and memory.
+
+Args:
+file_path: Path to the file to scan for imports
+log: Logger instance to use for warnings
+
+parsing_pre_import_modules:
+ default value is True
+"""
+if not conf.getboolean("scheduler", "parsing_pre_import_modules", 
fallback=True):

Review Comment:
   I think what @ephraimbuddy means is something like `if not 
conf.getboolean("dag_processor", "parsing_pre_import_modules", fallback=True):`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-10 Thread via GitHub


Lzzz666 commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2958254220

   > @Lzzz666 would you be able to take a look at the review comments on this 
PR?
   
   @amoghrajesh I might need some help or discussion regarding the location of 
the pre-import!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-06-03 Thread via GitHub


amoghrajesh commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2938813014

   @Lzzz666 would you be able to take a look at the review comments on this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-05-22 Thread via GitHub


eladkal commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2102327257


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -94,8 +96,35 @@ def _parse_file_entrypoint():
 comms_decoder.send_request(log, result)
 
 
+def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> 
None:
+"""
+Pre-import Airflow modules found in the given file.
+
+This prevents modules from being re-imported in each processing process,
+saving CPU time and memory.
+
+Args:
+file_path: Path to the file to scan for imports
+log: Logger instance to use for warnings
+
+parsing_pre_import_modules:
+ default value is True
+"""
+if not conf.getboolean("scheduler", "parsing_pre_import_modules", 
fallback=True):
+return
+
+for module in iter_airflow_imports(file_path):
+try:
+importlib.import_module(module)
+except ModuleNotFoundError as e:
+log.warning("Error when trying to pre-import module '%s' found in 
%s: %s", module, file_path, e)

Review Comment:
   Correct me if I am wrong but if import fails it means that the dag 
proccessor will never try to re-import it till the process is restarted. I 
think we need to change this.
   I think this also explains why in some cases (in 2.x) after dag processor 
restart I see many dags as broken always with import errors on my own modules.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-05-19 Thread via GitHub


Lee-W commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2891317453

   I just revoke my approval.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-05-19 Thread via GitHub


ephraimbuddy commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2095162188


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -94,8 +96,35 @@ def _parse_file_entrypoint():
 comms_decoder.send_request(log, result)
 
 
+def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> 
None:
+"""
+Pre-import Airflow modules found in the given file.
+
+This prevents modules from being re-imported in each processing process,
+saving CPU time and memory.
+
+Args:
+file_path: Path to the file to scan for imports
+log: Logger instance to use for warnings
+
+parsing_pre_import_modules:
+ default value is True
+"""
+if not conf.getboolean("scheduler", "parsing_pre_import_modules", 
fallback=True):
+return
+
+for module in iter_airflow_imports(file_path):
+try:
+importlib.import_module(module)
+except ModuleNotFoundError as e:
+log.warning("Error when trying to pre-import module '%s' found in 
%s: %s", module, file_path, e)
+
+
 def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> 
DagFileParsingResult | None:
 # TODO: Set known_pool names on DagBag!
+
+_pre_import_airflow_modules(msg.file, log)

Review Comment:
   Doing this per file seems wrong. It should be at the start of the DAG 
processor. Have you benchmarked this with current dag processor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-05-19 Thread via GitHub


ephraimbuddy commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2095147877


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -94,8 +96,35 @@ def _parse_file_entrypoint():
 comms_decoder.send_request(log, result)
 
 
+def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> 
None:
+"""
+Pre-import Airflow modules found in the given file.
+
+This prevents modules from being re-imported in each processing process,
+saving CPU time and memory.
+
+Args:
+file_path: Path to the file to scan for imports
+log: Logger instance to use for warnings
+
+parsing_pre_import_modules:
+ default value is True
+"""
+if not conf.getboolean("scheduler", "parsing_pre_import_modules", 
fallback=True):

Review Comment:
   This should be a dag processor config and not scheduler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-05-19 Thread via GitHub


Lee-W commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2890044105

   @jedcunningham @ephraimbuddy not sure whether it's something missed or 
intentionally removed. But it looks good to me. I'm planning on merging it 
early tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-05-19 Thread via GitHub


Lee-W commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2095094174


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -94,8 +96,35 @@ def _parse_file_entrypoint():
 comms_decoder.send_request(log, result)
 
 
+def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> 
None:
+"""
+Pre-import Airflow modules found in the given file.
+
+This prevents modules from being re-imported in each processing process,
+saving CPU time and memory.
+
+Args:
+file_path: Path to the file to scan for imports
+log: Logger instance to use for warnings
+
+parsing_pre_import_modules:
+ default value is True

Review Comment:
   ```suggestion
   ```



##
airflow-core/tests/unit/dag_processing/test_processor.py:
##
@@ -367,6 +373,61 @@ def test_import_module_in_bundle_root(self, tmp_path: 
pathlib.Path, inprocess_cl
 assert result.import_errors == {}
 assert result.serialized_dags[0].dag_id == "dag_name"
 
+def test__pre_import_airflow_modules_when_disabled(self, mock_logger):
+with (
+patch("airflow.configuration.conf.getboolean", return_value=False),

Review Comment:
   making it a fixture would probably make the tests easier to read



##
airflow-core/tests/unit/dag_processing/test_processor.py:
##
@@ -367,6 +373,61 @@ def test_import_module_in_bundle_root(self, tmp_path: 
pathlib.Path, inprocess_cl
 assert result.import_errors == {}
 assert result.serialized_dags[0].dag_id == "dag_name"
 
+def test__pre_import_airflow_modules_when_disabled(self, mock_logger):
+with (
+patch("airflow.configuration.conf.getboolean", return_value=False),
+patch("airflow.dag_processing.processor.iter_airflow_imports") as 
mock_iter,
+):
+_pre_import_airflow_modules("test.py", mock_logger)
+
+mock_iter.assert_not_called()
+mock_logger.warning.assert_not_called()
+
+def test__pre_import_airflow_modules_when_enabled(self, mock_logger):
+with (
+patch("airflow.configuration.conf.getboolean", return_value=True),
+patch("airflow.dag_processing.processor.iter_airflow_imports", 
return_value=["airflow.models"]),
+patch("airflow.dag_processing.processor.importlib.import_module") 
as mock_import,
+):
+_pre_import_airflow_modules("test.py", mock_logger)
+
+mock_import.assert_called_once_with("airflow.models")
+mock_logger.warning.assert_not_called()
+
+def test__pre_import_airflow_modules_warns_on_missing_module(self, 
mock_logger):
+with (
+patch("airflow.configuration.conf.getboolean", return_value=True),

Review Comment:
   same here



##
airflow-core/tests/unit/dag_processing/test_processor.py:
##
@@ -367,6 +373,61 @@ def test_import_module_in_bundle_root(self, tmp_path: 
pathlib.Path, inprocess_cl
 assert result.import_errors == {}
 assert result.serialized_dags[0].dag_id == "dag_name"
 
+def test__pre_import_airflow_modules_when_disabled(self, mock_logger):
+with (
+patch("airflow.configuration.conf.getboolean", return_value=False),

Review Comment:
   after a second thought, something like 
https://github.com/apache/airflow/blob/2156fbe00320218b071c76f57b527a8794f71c9c/airflow-core/tests/unit/jobs/test_scheduler_job.py#L116
 might make more sense



##
airflow-core/tests/unit/dag_processing/test_processor.py:
##
@@ -367,6 +373,61 @@ def test_import_module_in_bundle_root(self, tmp_path: 
pathlib.Path, inprocess_cl
 assert result.import_errors == {}
 assert result.serialized_dags[0].dag_id == "dag_name"
 
+def test__pre_import_airflow_modules_when_disabled(self, mock_logger):
+with (
+patch("airflow.configuration.conf.getboolean", return_value=False),
+patch("airflow.dag_processing.processor.iter_airflow_imports") as 
mock_iter,
+):
+_pre_import_airflow_modules("test.py", mock_logger)
+
+mock_iter.assert_not_called()
+mock_logger.warning.assert_not_called()
+
+def test__pre_import_airflow_modules_when_enabled(self, mock_logger):
+with (
+patch("airflow.configuration.conf.getboolean", return_value=True),

Review Comment:
   same here



##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -94,8 +96,35 @@ def _parse_file_entrypoint():
 comms_decoder.send_request(log, result)
 
 
+def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> 
None:
+"""
+Pre-import Airflow modules found in the given file.
+
+This prevents modules from being re-imported in each processing process,
+saving CPU time and memory.

Review Comment:
   ```suggestion
   saving CPU time and memory.
   (The default v

Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-05-12 Thread via GitHub


Lzzz666 commented on PR #50371:
URL: https://github.com/apache/airflow/pull/50371#issuecomment-2875259461

   > Also we might need to add a test for it
   
   I’ll try adding a unit test for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-05-12 Thread via GitHub


Lee-W commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2084836196


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -94,6 +96,21 @@ def _parse_file_entrypoint():
 
 def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> 
DagFileParsingResult | None:
 # TODO: Set known_pool names on DagBag!
+
+# Pre-import modules
+# Read the file to pre-import airflow modules used.
+# This prevents them from being re-imported from zero in each "processing" 
process
+# and saves CPU time and memory.
+
+if conf.getboolean("scheduler", "parsing_pre_import_modules", 
fallback=True):
+for module in iter_airflow_imports(msg.file):
+try:
+importlib.import_module(module)
+except Exception as e:

Review Comment:
   ```suggestion
   except Exception as e:
   ```
   
   We probably should use concrete exception here instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-05-10 Thread via GitHub


jason810496 commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2083387680


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -65,6 +68,8 @@
 Field(discriminator="type"),
 ]
 
+console = Console()

Review Comment:
   nit: Is this accidentally import ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-05-09 Thread via GitHub


jason810496 commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2082772825


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -94,6 +94,18 @@ def _parse_file_entrypoint():
 
 def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> 
DagFileParsingResult | None:
 # TODO: Set known_pool names on DagBag!
+# Pre-import modules
+if conf.getboolean("scheduler", "parsing_pre_import_modules", 
fallback=True):
+import importlib
+
+from airflow.utils.file import iter_airflow_imports
+
+for module in iter_airflow_imports(msg.file):

Review Comment:
   I just double checked the `for module in ..` part with 
   
   
https://github.com/apache/airflow/pull/30495/files#diff-3101df8e7c54fe1b9305774befb54b31edfe84cb81425a300e70142bc07637d4R192-R208
   
   Yes, we should add one more indentation.



##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -94,6 +94,18 @@ def _parse_file_entrypoint():
 
 def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> 
DagFileParsingResult | None:
 # TODO: Set known_pool names on DagBag!
+# Pre-import modules
+if conf.getboolean("scheduler", "parsing_pre_import_modules", 
fallback=True):
+import importlib
+
+from airflow.utils.file import iter_airflow_imports

Review Comment:
   nit: I think we can move the import to top level?
   
   Also, the comments in #30495 are useful, we can port the comments back as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]

2025-05-09 Thread via GitHub


Lee-W commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2081758552


##
airflow-core/src/airflow/dag_processing/processor.py:
##
@@ -94,6 +94,18 @@ def _parse_file_entrypoint():
 
 def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> 
DagFileParsingResult | None:
 # TODO: Set known_pool names on DagBag!
+# Pre-import modules
+if conf.getboolean("scheduler", "parsing_pre_import_modules", 
fallback=True):
+import importlib
+
+from airflow.utils.file import iter_airflow_imports
+
+for module in iter_airflow_imports(msg.file):

Review Comment:
   Not really familiar with this area. but the indentation here looks weird 🤔 
should we one more indentaion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org