Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lzzz666 commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2986907981 Fixed CI problems! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lee-W commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2986430283 Looks good! @Lee-W could you please take a look at the CI failure? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lzzz666 commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2979352249 I ran a benchmark comparing three loading strategies: 1. Pre-import before parsing loop Call `_pre_import_airflow_modules()` once before entering the while loop in _run_parsing_loop(). 2. Pre-import before fork Call it right before fork process 3. No pre-import **Setup** * 1,000 parse iterations per experiment * Loaded almost all core Airflow modules plus numpy, pandas, celery, and k8s (no providers, api, www, cli) * Averaged 1,000 parse (first run excluded) * Measured ```python process_creation_time = time.monotonic() - process_start_time ``` immediately after `self._start_new_processes()` **Results** “Before parsing loop” vs “before fork” * Moving the pre-import outside the parsing loop reduced `process_creation_time` by **92.8%** compared to doing it before per-fork. Baseline vs “before parsing loop” * Without pre-import at all was about 29% faster than pre-importing before the loop, but the actual time saved was tiny—probably just normal fork timing noise.  **Conclusion** If my experimental setup is correct, and pre-importing before the parse loop doesn’t introduce any unintended side effects, it might offer an opportunity to improve efficiency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lee-W commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2978686419 > I’m not entirely sure, but if we want to pre-load all the core Airflow modules—instead of, as before, pre-loading only the modules required by the current dag file before forking—should we place the pre-import function before the parse loop? This way, we can avoid redundant imports. Probably will need to benchmark again to see whether we want it. I feel it probably wouldn't affect much? Not sure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lzzz666 commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2977085285 Correct me if I’m wrong, but if we want to pre-load all the core Airflow modules, we should place the pre-import function before the parse loop. this way, we can avoid redundant imports. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
jason810496 commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2150048695 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -253,6 +281,15 @@ def start( # type: ignore[override] client: Client, **kwargs, ) -> Self: +# Pre-import Airflow modules in parent process to save CPU time and memory +# This prevents them from being re-imported from zero in each processing subprocess +import structlog + +log = structlog.get_logger(logger_name="dag_processor") Review Comment: My mistake, before move to the line before `proc: Self = super().start()` we can get `log` from the argument of the function. After moving there, IMO, it's fine to use `structlog.get_logger` to retrieve the logger, and will resolve most of the test errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
jason810496 commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2150016679 ## airflow-core/src/airflow/configuration.py: ## @@ -347,6 +347,7 @@ def sensitive_config_values(self) -> set[tuple[str, str]]: # DeprecationWarning will be issued and the old option will be used instead deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"), +("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.1.0"), Review Comment: nit: Should we move the row to the bottom ? Since there are ordered by change version currently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
jason810496 commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2150048695 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -253,6 +281,15 @@ def start( # type: ignore[override] client: Client, **kwargs, ) -> Self: +# Pre-import Airflow modules in parent process to save CPU time and memory +# This prevents them from being re-imported from zero in each processing subprocess +import structlog + +log = structlog.get_logger(logger_name="dag_processor") Review Comment: My mistake, before moving `_pre_import_airflow_modules` call to the line before `proc: Self = super().start()` ,we can get `log` from the argument of the function. After moving there, IMO, it's fine to use `structlog.get_logger` to retrieve the logger, and will resolve most of the test errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
ephraimbuddy commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2149589600 ## airflow-core/src/airflow/configuration.py: ## @@ -347,6 +347,7 @@ def sensitive_config_values(self) -> set[tuple[str, str]]: # DeprecationWarning will be issued and the old option will be used instead deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"), +("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.2"), Review Comment: ```suggestion ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.1.0"), ``` Let's plan this for 3.1.0 since it's not a bug fix but an improvement -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
ephraimbuddy commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2149257046 ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -2485,3 +2485,13 @@ dag_processor: type: integer example: ~ default: "10" +parsing_pre_import_modules: Review Comment: You can delete the scheduler section of this and update this list: https://github.com/apache/airflow/blob/3222862aafd41936f9152f770851e09ca77c7e3f/airflow-core/src/airflow/configuration.py#L348 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lzzz666 commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2975058197 I found that with the current pre_import implementation (#30495), the performance gains are negligible—probably because it only pre-imports the Airflow modules actually used in each DAG. In theory this should still speed things up, but my benchmarks didn’t show any improvement. However, when I modified the pre_import function to preload only the “heavier” third-party libraries (NumPy, pandas, the Kubernetes, and Celery), the speed-up became very noticeable. All of my tests involved parsing the same DAG ten times and measuring `run_duration` (which is defined as `run_duration = time.monotonic() - proc.start_time`). 1. First test with origin pre-imports method  2. Second test with origin pre-imports method  3. Modify pre-import to only include NumPy, pandas, Kubernetes, celery  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
jason810496 commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2148335724 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -253,6 +281,15 @@ def start( # type: ignore[override] client: Client, **kwargs, ) -> Self: +# Pre-import Airflow modules in parent process to save CPU time and memory +# This prevents them from being re-imported from zero in each processing subprocess +import structlog + +log = structlog.get_logger(logger_name="dag_processor") Review Comment: I think we can directly use `log` passed by argument directly instead of getting logger again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
jason810496 commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2148335724 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -253,6 +281,15 @@ def start( # type: ignore[override] client: Client, **kwargs, ) -> Self: +# Pre-import Airflow modules in parent process to save CPU time and memory +# This prevents them from being re-imported from zero in each processing subprocess +import structlog + +log = structlog.get_logger(logger_name="dag_processor") Review Comment: I think we can directly use `log` pass by argument directly instead of getting logger again. ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -99,6 +101,31 @@ class DagFileParsingResult(BaseModel): ] +def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> None: +""" +Pre-import Airflow modules found in the given file. + +This prevents modules from being re-imported in each processing process, +saving CPU time and memory. + +Args: +file_path: Path to the file to scan for imports +log: Logger instance to use for warnings + +parsing_pre_import_modules: + default value is True +""" +if not conf.getboolean("dag_processor", "parsing_pre_import_modules", fallback=True): +return + +log.info(f"[Debug by lz] dag_processor.parsing_pre_import_modules is {conf.getboolean('dag_processor', 'parsing_pre_import_modules', fallback=True)}") Review Comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lzzz666 commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2971286687 I’ll refer to the experiment in #30495 and provide benchmark results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
ephraimbuddy commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2969785085 > Correct me if I’m wrong, but when I traced the code, it appears that `proc: Self = super().start()` fork parent process to create the child process. If the goal of the pre-import is to load modules in the parent before forking, it might make more sense to place it before the `proc: Self = super().start`. > > ```js > DagFileProcessorManager.run() > └─ run_parsing_loop() >└─ while loop: > └─ start_new_processes() > └─ create_process(dag_path) >└─ DagFileProcessorProcess.start() >└─ proc: Self = super().start(target=target, client=client, **kwargs) //fork > ``` Your trace is right but I feel, we should have a verifiable reason to implement the pre-import. If you can spend some time and showcase the issue, I believe it'll help all of us in finding the reason to re-implement this pre-import -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lzzz666 commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2969437961 Correct me if I’m wrong, but when I traced the code, it appears that `proc: Self = super().start()` fork parent process to create the child process. If the goal of the pre-import is to load modules in the parent before forking, it might make more sense to place it before the `proc: Self = super().start`. ```javascript DagFileProcessorManager.run() └─ run_parsing_loop() └─ while loop: └─ start_new_processes() └─ create_process(dag_path) └─ DagFileProcessorProcess.start() └─ proc: Self = super().start(target=target, client=client, **kwargs) //fork ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lzzz666 commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2968908245 Hi @ephraimbuddy, I’m not familiar with the scheduler, but based on the [Dag File Processing](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/dagfile-processing.html) documentation, I think the pre-import function is executed in the DagFileProcessorProcess stage, which runs in the parent process before forking. So I tried adding pre-import to the parse_file(), I now realize that’s not the right place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
ephraimbuddy commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2963883416 > > @Lzzz666 would you be able to take a look at the review comments on this PR? > > @amoghrajesh I might need some help or discussion regarding the location of the pre-import! > > @Lzzz666 would you be able to take a look at the review comments on this PR? > > @amoghrajesh I might need some help or discussion regarding the location of the pre-import! Could you clarify what you mean by the “location of the pre-import”? Implementing pre-import in Airflow 2 doesn’t automatically require us to carry it over to Airflow 3. Airflow 3 uses a dedicated standalone DAG processor, whereas Airflow 2 still straddled scheduler-based and standalone dag parsing. Before moving forward, can you point out the specific issue you’re seeing or error messages? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lee-W commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2137327413 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -94,8 +96,35 @@ def _parse_file_entrypoint(): comms_decoder.send_request(log, result) +def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> None: +""" +Pre-import Airflow modules found in the given file. + +This prevents modules from being re-imported in each processing process, +saving CPU time and memory. + +Args: +file_path: Path to the file to scan for imports +log: Logger instance to use for warnings + +parsing_pre_import_modules: + default value is True +""" +if not conf.getboolean("scheduler", "parsing_pre_import_modules", fallback=True): Review Comment: I think what @ephraimbuddy means is something like `if not conf.getboolean("dag_processor", "parsing_pre_import_modules", fallback=True):` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lzzz666 commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2958254220 > @Lzzz666 would you be able to take a look at the review comments on this PR? @amoghrajesh I might need some help or discussion regarding the location of the pre-import! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
amoghrajesh commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2938813014 @Lzzz666 would you be able to take a look at the review comments on this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
eladkal commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2102327257 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -94,8 +96,35 @@ def _parse_file_entrypoint(): comms_decoder.send_request(log, result) +def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> None: +""" +Pre-import Airflow modules found in the given file. + +This prevents modules from being re-imported in each processing process, +saving CPU time and memory. + +Args: +file_path: Path to the file to scan for imports +log: Logger instance to use for warnings + +parsing_pre_import_modules: + default value is True +""" +if not conf.getboolean("scheduler", "parsing_pre_import_modules", fallback=True): +return + +for module in iter_airflow_imports(file_path): +try: +importlib.import_module(module) +except ModuleNotFoundError as e: +log.warning("Error when trying to pre-import module '%s' found in %s: %s", module, file_path, e) Review Comment: Correct me if I am wrong but if import fails it means that the dag proccessor will never try to re-import it till the process is restarted. I think we need to change this. I think this also explains why in some cases (in 2.x) after dag processor restart I see many dags as broken always with import errors on my own modules. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lee-W commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2891317453 I just revoke my approval. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
ephraimbuddy commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2095162188 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -94,8 +96,35 @@ def _parse_file_entrypoint(): comms_decoder.send_request(log, result) +def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> None: +""" +Pre-import Airflow modules found in the given file. + +This prevents modules from being re-imported in each processing process, +saving CPU time and memory. + +Args: +file_path: Path to the file to scan for imports +log: Logger instance to use for warnings + +parsing_pre_import_modules: + default value is True +""" +if not conf.getboolean("scheduler", "parsing_pre_import_modules", fallback=True): +return + +for module in iter_airflow_imports(file_path): +try: +importlib.import_module(module) +except ModuleNotFoundError as e: +log.warning("Error when trying to pre-import module '%s' found in %s: %s", module, file_path, e) + + def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None: # TODO: Set known_pool names on DagBag! + +_pre_import_airflow_modules(msg.file, log) Review Comment: Doing this per file seems wrong. It should be at the start of the DAG processor. Have you benchmarked this with current dag processor? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
ephraimbuddy commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2095147877 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -94,8 +96,35 @@ def _parse_file_entrypoint(): comms_decoder.send_request(log, result) +def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> None: +""" +Pre-import Airflow modules found in the given file. + +This prevents modules from being re-imported in each processing process, +saving CPU time and memory. + +Args: +file_path: Path to the file to scan for imports +log: Logger instance to use for warnings + +parsing_pre_import_modules: + default value is True +""" +if not conf.getboolean("scheduler", "parsing_pre_import_modules", fallback=True): Review Comment: This should be a dag processor config and not scheduler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lee-W commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2890044105 @jedcunningham @ephraimbuddy not sure whether it's something missed or intentionally removed. But it looks good to me. I'm planning on merging it early tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lee-W commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2095094174 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -94,8 +96,35 @@ def _parse_file_entrypoint(): comms_decoder.send_request(log, result) +def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> None: +""" +Pre-import Airflow modules found in the given file. + +This prevents modules from being re-imported in each processing process, +saving CPU time and memory. + +Args: +file_path: Path to the file to scan for imports +log: Logger instance to use for warnings + +parsing_pre_import_modules: + default value is True Review Comment: ```suggestion ``` ## airflow-core/tests/unit/dag_processing/test_processor.py: ## @@ -367,6 +373,61 @@ def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path, inprocess_cl assert result.import_errors == {} assert result.serialized_dags[0].dag_id == "dag_name" +def test__pre_import_airflow_modules_when_disabled(self, mock_logger): +with ( +patch("airflow.configuration.conf.getboolean", return_value=False), Review Comment: making it a fixture would probably make the tests easier to read ## airflow-core/tests/unit/dag_processing/test_processor.py: ## @@ -367,6 +373,61 @@ def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path, inprocess_cl assert result.import_errors == {} assert result.serialized_dags[0].dag_id == "dag_name" +def test__pre_import_airflow_modules_when_disabled(self, mock_logger): +with ( +patch("airflow.configuration.conf.getboolean", return_value=False), +patch("airflow.dag_processing.processor.iter_airflow_imports") as mock_iter, +): +_pre_import_airflow_modules("test.py", mock_logger) + +mock_iter.assert_not_called() +mock_logger.warning.assert_not_called() + +def test__pre_import_airflow_modules_when_enabled(self, mock_logger): +with ( +patch("airflow.configuration.conf.getboolean", return_value=True), +patch("airflow.dag_processing.processor.iter_airflow_imports", return_value=["airflow.models"]), +patch("airflow.dag_processing.processor.importlib.import_module") as mock_import, +): +_pre_import_airflow_modules("test.py", mock_logger) + +mock_import.assert_called_once_with("airflow.models") +mock_logger.warning.assert_not_called() + +def test__pre_import_airflow_modules_warns_on_missing_module(self, mock_logger): +with ( +patch("airflow.configuration.conf.getboolean", return_value=True), Review Comment: same here ## airflow-core/tests/unit/dag_processing/test_processor.py: ## @@ -367,6 +373,61 @@ def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path, inprocess_cl assert result.import_errors == {} assert result.serialized_dags[0].dag_id == "dag_name" +def test__pre_import_airflow_modules_when_disabled(self, mock_logger): +with ( +patch("airflow.configuration.conf.getboolean", return_value=False), Review Comment: after a second thought, something like https://github.com/apache/airflow/blob/2156fbe00320218b071c76f57b527a8794f71c9c/airflow-core/tests/unit/jobs/test_scheduler_job.py#L116 might make more sense ## airflow-core/tests/unit/dag_processing/test_processor.py: ## @@ -367,6 +373,61 @@ def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path, inprocess_cl assert result.import_errors == {} assert result.serialized_dags[0].dag_id == "dag_name" +def test__pre_import_airflow_modules_when_disabled(self, mock_logger): +with ( +patch("airflow.configuration.conf.getboolean", return_value=False), +patch("airflow.dag_processing.processor.iter_airflow_imports") as mock_iter, +): +_pre_import_airflow_modules("test.py", mock_logger) + +mock_iter.assert_not_called() +mock_logger.warning.assert_not_called() + +def test__pre_import_airflow_modules_when_enabled(self, mock_logger): +with ( +patch("airflow.configuration.conf.getboolean", return_value=True), Review Comment: same here ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -94,8 +96,35 @@ def _parse_file_entrypoint(): comms_decoder.send_request(log, result) +def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> None: +""" +Pre-import Airflow modules found in the given file. + +This prevents modules from being re-imported in each processing process, +saving CPU time and memory. Review Comment: ```suggestion saving CPU time and memory. (The default v
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lzzz666 commented on PR #50371: URL: https://github.com/apache/airflow/pull/50371#issuecomment-2875259461 > Also we might need to add a test for it I’ll try adding a unit test for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lee-W commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2084836196 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -94,6 +96,21 @@ def _parse_file_entrypoint(): def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None: # TODO: Set known_pool names on DagBag! + +# Pre-import modules +# Read the file to pre-import airflow modules used. +# This prevents them from being re-imported from zero in each "processing" process +# and saves CPU time and memory. + +if conf.getboolean("scheduler", "parsing_pre_import_modules", fallback=True): +for module in iter_airflow_imports(msg.file): +try: +importlib.import_module(module) +except Exception as e: Review Comment: ```suggestion except Exception as e: ``` We probably should use concrete exception here instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
jason810496 commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2083387680 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -65,6 +68,8 @@ Field(discriminator="type"), ] +console = Console() Review Comment: nit: Is this accidentally import ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
jason810496 commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2082772825 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -94,6 +94,18 @@ def _parse_file_entrypoint(): def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None: # TODO: Set known_pool names on DagBag! +# Pre-import modules +if conf.getboolean("scheduler", "parsing_pre_import_modules", fallback=True): +import importlib + +from airflow.utils.file import iter_airflow_imports + +for module in iter_airflow_imports(msg.file): Review Comment: I just double checked the `for module in ..` part with https://github.com/apache/airflow/pull/30495/files#diff-3101df8e7c54fe1b9305774befb54b31edfe84cb81425a300e70142bc07637d4R192-R208 Yes, we should add one more indentation. ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -94,6 +94,18 @@ def _parse_file_entrypoint(): def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None: # TODO: Set known_pool names on DagBag! +# Pre-import modules +if conf.getboolean("scheduler", "parsing_pre_import_modules", fallback=True): +import importlib + +from airflow.utils.file import iter_airflow_imports Review Comment: nit: I think we can move the import to top level? Also, the comments in #30495 are useful, we can port the comments back as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) [airflow]
Lee-W commented on code in PR #50371: URL: https://github.com/apache/airflow/pull/50371#discussion_r2081758552 ## airflow-core/src/airflow/dag_processing/processor.py: ## @@ -94,6 +94,18 @@ def _parse_file_entrypoint(): def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None: # TODO: Set known_pool names on DagBag! +# Pre-import modules +if conf.getboolean("scheduler", "parsing_pre_import_modules", fallback=True): +import importlib + +from airflow.utils.file import iter_airflow_imports + +for module in iter_airflow_imports(msg.file): Review Comment: Not really familiar with this area. but the indentation here looks weird 🤔 should we one more indentaion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org