This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b442a26186 Fix process leaks in `GitDagBundle` repository management 
(#54997)
3b442a26186 is described below

commit 3b442a261867af3b8fabf65642b72c27d4e93d3b
Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com>
AuthorDate: Thu Aug 28 01:39:04 2025 -0600

    Fix process leaks in `GitDagBundle` repository management (#54997)
    
    Fix leftover git cat-file processes from GitDagBundle.
    GitDagBundle was leaving behind `git cat-file --batch-check` processes
    after operations, causing process accumulation over time. This occurred
    because Git repository objects were not being properly closed after
    repo operations.
---
 providers/git/src/airflow/providers/git/bundles/git.py |  7 ++++++-
 providers/git/tests/unit/git/bundles/test_git.py       | 16 ++++++++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/providers/git/src/airflow/providers/git/bundles/git.py 
b/providers/git/src/airflow/providers/git/bundles/git.py
index 28241ab045d..643985176bb 100644
--- a/providers/git/src/airflow/providers/git/bundles/git.py
+++ b/providers/git/src/airflow/providers/git/bundles/git.py
@@ -93,6 +93,7 @@ class GitDagBundle(BaseDagBundle):
             with cm:
                 self._clone_bare_repo_if_required()
                 self._ensure_version_in_bare_repo()
+            self.bare_repo.close()
 
             self._clone_repo_if_required()
             self.repo.git.checkout(self.tracking_ref)
@@ -104,6 +105,7 @@ class GitDagBundle(BaseDagBundle):
                 self.repo.head.reset(index=True, working_tree=True)
             else:
                 self.refresh()
+            self.repo.close()
 
     def initialize(self) -> None:
         if not self.repo_url:
@@ -161,7 +163,8 @@ class GitDagBundle(BaseDagBundle):
         )
 
     def get_current_version(self) -> str:
-        return self.repo.head.commit.hexsha
+        with self.repo as repo:
+            return repo.head.commit.hexsha
 
     @property
     def path(self) -> Path:
@@ -184,6 +187,7 @@ class GitDagBundle(BaseDagBundle):
             cm = self.bare_repo.git.custom_environment(GIT_SSH_COMMAND=cmd)
         with cm:
             self.bare_repo.remotes.origin.fetch(refspecs)
+            self.bare_repo.close()
 
     def refresh(self) -> None:
         if self.version:
@@ -202,6 +206,7 @@ class GitDagBundle(BaseDagBundle):
                 else:
                     target = self.tracking_ref
                 self.repo.head.reset(target, index=True, working_tree=True)
+                self.repo.close()
 
     @staticmethod
     def _convert_git_ssh_url_to_https(url: str) -> str:
diff --git a/providers/git/tests/unit/git/bundles/test_git.py 
b/providers/git/tests/unit/git/bundles/test_git.py
index 6fc73fcab24..0d7af276b30 100644
--- a/providers/git/tests/unit/git/bundles/test_git.py
+++ b/providers/git/tests/unit/git/bundles/test_git.py
@@ -68,6 +68,14 @@ def git_repo(tmp_path_factory):
     return (directory, repo)
 
 
+def assert_repo_is_closed(bundle: GitDagBundle):
+    # cat-file processes get left around if the repo is not closed, so check 
it was
+    assert bundle.repo.git.cat_file_all is None
+    assert bundle.bare_repo.git.cat_file_all is None
+    assert bundle.repo.git.cat_file_header is None
+    assert bundle.bare_repo.git.cat_file_header is None
+
+
 class TestGitDagBundle:
     @classmethod
     def teardown_class(cls) -> None:
@@ -137,6 +145,8 @@ class TestGitDagBundle:
 
         assert bundle.get_current_version() == repo.head.commit.hexsha
 
+        assert_repo_is_closed(bundle)
+
     @mock.patch("airflow.providers.git.bundles.git.GitHook")
     def test_get_specific_version(self, mock_githook, git_repo):
         repo_path, repo = git_repo
@@ -163,6 +173,8 @@ class TestGitDagBundle:
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"test_dag.py"} == files_in_repo
 
+        assert_repo_is_closed(bundle)
+
     @mock.patch("airflow.providers.git.bundles.git.GitHook")
     def test_get_tag_version(self, mock_githook, git_repo):
         repo_path, repo = git_repo
@@ -211,6 +223,8 @@ class TestGitDagBundle:
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"test_dag.py", "new_test.py"} == files_in_repo
 
+        assert_repo_is_closed(bundle)
+
     @pytest.mark.parametrize(
         "amend",
         [
@@ -250,6 +264,8 @@ class TestGitDagBundle:
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"test_dag.py", "new_test.py"} == files_in_repo
 
+        assert_repo_is_closed(bundle)
+
     @mock.patch("airflow.providers.git.bundles.git.GitHook")
     def test_refresh_tag(self, mock_githook, git_repo):
         """Ensure that the bundle refresh works when tracking a tag"""

Reply via email to