This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6f1243a6e5b Add venv pycache clean up for the PythonVirtualenvOperator
(#53390)
6f1243a6e5b is described below
commit 6f1243a6e5bba80a918ab7fc9f4dc2f6beee9330
Author: olegkachur-e <[email protected]>
AuthorDate: Thu Jul 24 18:19:35 2025 +0200
Add venv pycache clean up for the PythonVirtualenvOperator (#53390)
Extend cleanup logic for the PythonVirtualenvOperator,
as previous implementation cleans up the venv directory itself,
but kept the pycache trace.
Co-authored-by: Oleg Kachur <[email protected]>
---
.../airflow/providers/standard/operators/python.py | 13 ++++++++
.../tests/unit/standard/operators/test_python.py | 36 ++++++++++++++++++++++
2 files changed, 49 insertions(+)
diff --git
a/providers/standard/src/airflow/providers/standard/operators/python.py
b/providers/standard/src/airflow/providers/standard/operators/python.py
index e0276e4ebe2..3ab7b774c10 100644
--- a/providers/standard/src/airflow/providers/standard/operators/python.py
+++ b/providers/standard/src/airflow/providers/standard/operators/python.py
@@ -861,6 +861,15 @@ class
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
self.log.info("New Python virtual environment created in %s",
venv_path)
return venv_path
+ def _cleanup_python_pycache_dir(self, cache_dir_path: Path) -> None:
+ try:
+ shutil.rmtree(cache_dir_path)
+ self.log.debug("The directory %s has been deleted.",
cache_dir_path)
+ except FileNotFoundError:
+ self.log.warning("Fail to delete %s. The directory does not
exist.", cache_dir_path)
+ except PermissionError:
+ self.log.warning("Permission denied to delete the directory %s.",
cache_dir_path)
+
def _retrieve_index_urls_from_connection_ids(self):
"""Retrieve index URLs from Package Index connections."""
if self.index_urls is None:
@@ -880,9 +889,13 @@ class
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
with TemporaryDirectory(prefix="venv") as tmp_dir:
tmp_path = Path(tmp_dir)
+ tmp_dir, temp_venv_dir =
tmp_path.relative_to(tmp_path.anchor).parts
+ custom_pycache_prefix = Path(sys.pycache_prefix or "")
+ venv_python_cache_dir = Path.cwd() / custom_pycache_prefix /
tmp_dir / temp_venv_dir
self._prepare_venv(tmp_path)
python_path = tmp_path / "bin" / "python"
result = self._execute_python_callable_in_subprocess(python_path)
+ self._cleanup_python_pycache_dir(venv_python_cache_dir)
return result
def _iter_serializable_context_keys(self):
diff --git a/providers/standard/tests/unit/standard/operators/test_python.py
b/providers/standard/tests/unit/standard/operators/test_python.py
index 95c503aed70..3369a712da0 100644
--- a/providers/standard/tests/unit/standard/operators/test_python.py
+++ b/providers/standard/tests/unit/standard/operators/test_python.py
@@ -1660,6 +1660,42 @@ class
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
msg = str(exc_info.value)
assert f"Invalid requirement '{invalid_requirement}'" in msg
+
@mock.patch("airflow.providers.standard.operators.python.PythonVirtualenvOperator._prepare_venv")
+ @mock.patch(
+
"airflow.providers.standard.operators.python.PythonVirtualenvOperator._execute_python_callable_in_subprocess"
+ )
+ @mock.patch(
+
"airflow.providers.standard.operators.python.PythonVirtualenvOperator._cleanup_python_pycache_dir"
+ )
+ def test_execute_callable_pycache_cleanup(
+ self, pycache_cleanup_mock, execute_in_subprocess_mock,
prepare_venv_mock
+ ):
+ custom_pycache_prefix = "custom/__pycache__"
+ tempdir_name = "tmp"
+ venv_dir_temp_name = "venvrandom"
+ venv_path_tmp = f"/{tempdir_name}/{venv_dir_temp_name}"
+ expected_cleanup_path = Path.cwd() / custom_pycache_prefix /
tempdir_name / venv_dir_temp_name
+
+ def f():
+ return 1
+
+ op = PythonVirtualenvOperator(
+ task_id="task",
+ python_callable=f,
+ system_site_packages=False,
+ )
+ with mock.patch.object(sys, "pycache_prefix",
new=custom_pycache_prefix):
+ with mock.patch(
+
"airflow.providers.standard.operators.python.TemporaryDirectory"
+ ) as mock_temp_dir:
+ mock_context = mock_temp_dir.return_value.__enter__
+ mock_context.return_value = venv_path_tmp
+ op.execute_callable()
+
+ execute_in_subprocess_mock.assert_called_once()
+ prepare_venv_mock.assert_called_once_with(Path(venv_path_tmp))
+ pycache_cleanup_mock.assert_called_once_with(expected_cleanup_path)
+
# when venv tests are run in parallel to other test they create new processes
and this might take
# quite some time in shared docker environment and get some contention even
between different containers