This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f1243a6e5b Add venv pycache clean up for the PythonVirtualenvOperator 
(#53390)
6f1243a6e5b is described below

commit 6f1243a6e5bba80a918ab7fc9f4dc2f6beee9330
Author: olegkachur-e <[email protected]>
AuthorDate: Thu Jul 24 18:19:35 2025 +0200

    Add venv pycache clean up for the PythonVirtualenvOperator (#53390)
    
    Extend cleanup logic for the PythonVirtualenvOperator,
    as previous implementation cleans up the venv directory itself,
    but kept the pycache trace.
    
    Co-authored-by: Oleg Kachur <[email protected]>
---
 .../airflow/providers/standard/operators/python.py | 13 ++++++++
 .../tests/unit/standard/operators/test_python.py   | 36 ++++++++++++++++++++++
 2 files changed, 49 insertions(+)

diff --git 
a/providers/standard/src/airflow/providers/standard/operators/python.py 
b/providers/standard/src/airflow/providers/standard/operators/python.py
index e0276e4ebe2..3ab7b774c10 100644
--- a/providers/standard/src/airflow/providers/standard/operators/python.py
+++ b/providers/standard/src/airflow/providers/standard/operators/python.py
@@ -861,6 +861,15 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
             self.log.info("New Python virtual environment created in %s", 
venv_path)
             return venv_path
 
+    def _cleanup_python_pycache_dir(self, cache_dir_path: Path) -> None:
+        try:
+            shutil.rmtree(cache_dir_path)
+            self.log.debug("The directory %s has been deleted.", 
cache_dir_path)
+        except FileNotFoundError:
+            self.log.warning("Fail to delete %s. The directory does not 
exist.", cache_dir_path)
+        except PermissionError:
+            self.log.warning("Permission denied to delete the directory %s.", 
cache_dir_path)
+
     def _retrieve_index_urls_from_connection_ids(self):
         """Retrieve index URLs from Package Index connections."""
         if self.index_urls is None:
@@ -880,9 +889,13 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
 
         with TemporaryDirectory(prefix="venv") as tmp_dir:
             tmp_path = Path(tmp_dir)
+            tmp_dir, temp_venv_dir = 
tmp_path.relative_to(tmp_path.anchor).parts
+            custom_pycache_prefix = Path(sys.pycache_prefix or "")
+            venv_python_cache_dir = Path.cwd() / custom_pycache_prefix / 
tmp_dir / temp_venv_dir
             self._prepare_venv(tmp_path)
             python_path = tmp_path / "bin" / "python"
             result = self._execute_python_callable_in_subprocess(python_path)
+            self._cleanup_python_pycache_dir(venv_python_cache_dir)
             return result
 
     def _iter_serializable_context_keys(self):
diff --git a/providers/standard/tests/unit/standard/operators/test_python.py 
b/providers/standard/tests/unit/standard/operators/test_python.py
index 95c503aed70..3369a712da0 100644
--- a/providers/standard/tests/unit/standard/operators/test_python.py
+++ b/providers/standard/tests/unit/standard/operators/test_python.py
@@ -1660,6 +1660,42 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
         msg = str(exc_info.value)
         assert f"Invalid requirement '{invalid_requirement}'" in msg
 
+    
@mock.patch("airflow.providers.standard.operators.python.PythonVirtualenvOperator._prepare_venv")
+    @mock.patch(
+        
"airflow.providers.standard.operators.python.PythonVirtualenvOperator._execute_python_callable_in_subprocess"
+    )
+    @mock.patch(
+        
"airflow.providers.standard.operators.python.PythonVirtualenvOperator._cleanup_python_pycache_dir"
+    )
+    def test_execute_callable_pycache_cleanup(
+        self, pycache_cleanup_mock, execute_in_subprocess_mock, 
prepare_venv_mock
+    ):
+        custom_pycache_prefix = "custom/__pycache__"
+        tempdir_name = "tmp"
+        venv_dir_temp_name = "venvrandom"
+        venv_path_tmp = f"/{tempdir_name}/{venv_dir_temp_name}"
+        expected_cleanup_path = Path.cwd() / custom_pycache_prefix / 
tempdir_name / venv_dir_temp_name
+
+        def f():
+            return 1
+
+        op = PythonVirtualenvOperator(
+            task_id="task",
+            python_callable=f,
+            system_site_packages=False,
+        )
+        with mock.patch.object(sys, "pycache_prefix", 
new=custom_pycache_prefix):
+            with mock.patch(
+                
"airflow.providers.standard.operators.python.TemporaryDirectory"
+            ) as mock_temp_dir:
+                mock_context = mock_temp_dir.return_value.__enter__
+                mock_context.return_value = venv_path_tmp
+                op.execute_callable()
+
+        execute_in_subprocess_mock.assert_called_once()
+        prepare_venv_mock.assert_called_once_with(Path(venv_path_tmp))
+        pycache_cleanup_mock.assert_called_once_with(expected_cleanup_path)
+
 
 # when venv tests are run in parallel to other test they create new processes 
and this might take
 # quite some time in shared docker environment and get some contention even 
between different containers

Reply via email to