Lee-W commented on code in PR #50371:
URL: https://github.com/apache/airflow/pull/50371#discussion_r2095094174
##########
airflow-core/src/airflow/dag_processing/processor.py:
##########
@@ -94,8 +96,35 @@ def _parse_file_entrypoint():
comms_decoder.send_request(log, result)
+def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) ->
None:
+ """
+ Pre-import Airflow modules found in the given file.
+
+ This prevents modules from being re-imported in each processing process,
+ saving CPU time and memory.
+
+ Args:
+ file_path: Path to the file to scan for imports
+ log: Logger instance to use for warnings
+
+ parsing_pre_import_modules:
+ default value is True
Review Comment:
```suggestion
```
##########
airflow-core/tests/unit/dag_processing/test_processor.py:
##########
@@ -367,6 +373,61 @@ def test_import_module_in_bundle_root(self, tmp_path:
pathlib.Path, inprocess_cl
assert result.import_errors == {}
assert result.serialized_dags[0].dag_id == "dag_name"
+ def test__pre_import_airflow_modules_when_disabled(self, mock_logger):
+ with (
+ patch("airflow.configuration.conf.getboolean", return_value=False),
Review Comment:
making it a fixture would probably make the tests easier to read
##########
airflow-core/tests/unit/dag_processing/test_processor.py:
##########
@@ -367,6 +373,61 @@ def test_import_module_in_bundle_root(self, tmp_path:
pathlib.Path, inprocess_cl
assert result.import_errors == {}
assert result.serialized_dags[0].dag_id == "dag_name"
+ def test__pre_import_airflow_modules_when_disabled(self, mock_logger):
+ with (
+ patch("airflow.configuration.conf.getboolean", return_value=False),
+ patch("airflow.dag_processing.processor.iter_airflow_imports") as
mock_iter,
+ ):
+ _pre_import_airflow_modules("test.py", mock_logger)
+
+ mock_iter.assert_not_called()
+ mock_logger.warning.assert_not_called()
+
+ def test__pre_import_airflow_modules_when_enabled(self, mock_logger):
+ with (
+ patch("airflow.configuration.conf.getboolean", return_value=True),
+ patch("airflow.dag_processing.processor.iter_airflow_imports",
return_value=["airflow.models"]),
+ patch("airflow.dag_processing.processor.importlib.import_module")
as mock_import,
+ ):
+ _pre_import_airflow_modules("test.py", mock_logger)
+
+ mock_import.assert_called_once_with("airflow.models")
+ mock_logger.warning.assert_not_called()
+
+ def test__pre_import_airflow_modules_warns_on_missing_module(self,
mock_logger):
+ with (
+ patch("airflow.configuration.conf.getboolean", return_value=True),
Review Comment:
same here
##########
airflow-core/tests/unit/dag_processing/test_processor.py:
##########
@@ -367,6 +373,61 @@ def test_import_module_in_bundle_root(self, tmp_path:
pathlib.Path, inprocess_cl
assert result.import_errors == {}
assert result.serialized_dags[0].dag_id == "dag_name"
+ def test__pre_import_airflow_modules_when_disabled(self, mock_logger):
+ with (
+ patch("airflow.configuration.conf.getboolean", return_value=False),
Review Comment:
after a second thought, something like
https://github.com/apache/airflow/blob/2156fbe00320218b071c76f57b527a8794f71c9c/airflow-core/tests/unit/jobs/test_scheduler_job.py#L116
might make more sense
##########
airflow-core/tests/unit/dag_processing/test_processor.py:
##########
@@ -367,6 +373,61 @@ def test_import_module_in_bundle_root(self, tmp_path:
pathlib.Path, inprocess_cl
assert result.import_errors == {}
assert result.serialized_dags[0].dag_id == "dag_name"
+ def test__pre_import_airflow_modules_when_disabled(self, mock_logger):
+ with (
+ patch("airflow.configuration.conf.getboolean", return_value=False),
+ patch("airflow.dag_processing.processor.iter_airflow_imports") as
mock_iter,
+ ):
+ _pre_import_airflow_modules("test.py", mock_logger)
+
+ mock_iter.assert_not_called()
+ mock_logger.warning.assert_not_called()
+
+ def test__pre_import_airflow_modules_when_enabled(self, mock_logger):
+ with (
+ patch("airflow.configuration.conf.getboolean", return_value=True),
Review Comment:
same here
##########
airflow-core/src/airflow/dag_processing/processor.py:
##########
@@ -94,8 +96,35 @@ def _parse_file_entrypoint():
comms_decoder.send_request(log, result)
+def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) ->
None:
+ """
+ Pre-import Airflow modules found in the given file.
+
+ This prevents modules from being re-imported in each processing process,
+ saving CPU time and memory.
Review Comment:
```suggestion
saving CPU time and memory.
(The default value of "parsing_pre_import_modules" is set to True)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]