This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 3b442a26186 Fix process leaks in `GitDagBundle` repository management (#54997) 3b442a26186 is described below commit 3b442a261867af3b8fabf65642b72c27d4e93d3b Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> AuthorDate: Thu Aug 28 01:39:04 2025 -0600 Fix process leaks in `GitDagBundle` repository management (#54997) Fix leftover git cat-file processes from GitDagBundle. GitDagBundle was leaving behind `git cat-file --batch-check` processes after operations, causing process accumulation over time. This occurred because Git repository objects were not being properly closed after repo operations. --- providers/git/src/airflow/providers/git/bundles/git.py | 7 ++++++- providers/git/tests/unit/git/bundles/test_git.py | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/providers/git/src/airflow/providers/git/bundles/git.py b/providers/git/src/airflow/providers/git/bundles/git.py index 28241ab045d..643985176bb 100644 --- a/providers/git/src/airflow/providers/git/bundles/git.py +++ b/providers/git/src/airflow/providers/git/bundles/git.py @@ -93,6 +93,7 @@ class GitDagBundle(BaseDagBundle): with cm: self._clone_bare_repo_if_required() self._ensure_version_in_bare_repo() + self.bare_repo.close() self._clone_repo_if_required() self.repo.git.checkout(self.tracking_ref) @@ -104,6 +105,7 @@ class GitDagBundle(BaseDagBundle): self.repo.head.reset(index=True, working_tree=True) else: self.refresh() + self.repo.close() def initialize(self) -> None: if not self.repo_url: @@ -161,7 +163,8 @@ class GitDagBundle(BaseDagBundle): ) def get_current_version(self) -> str: - return self.repo.head.commit.hexsha + with self.repo as repo: + return repo.head.commit.hexsha @property def path(self) -> Path: @@ -184,6 +187,7 @@ class GitDagBundle(BaseDagBundle): cm = self.bare_repo.git.custom_environment(GIT_SSH_COMMAND=cmd) with cm: self.bare_repo.remotes.origin.fetch(refspecs) + self.bare_repo.close() def refresh(self) -> None: if self.version: @@ -202,6 +206,7 @@ class GitDagBundle(BaseDagBundle): else: target = self.tracking_ref self.repo.head.reset(target, index=True, working_tree=True) + self.repo.close() @staticmethod def _convert_git_ssh_url_to_https(url: str) -> str: diff --git a/providers/git/tests/unit/git/bundles/test_git.py b/providers/git/tests/unit/git/bundles/test_git.py index 6fc73fcab24..0d7af276b30 100644 --- a/providers/git/tests/unit/git/bundles/test_git.py +++ b/providers/git/tests/unit/git/bundles/test_git.py @@ -68,6 +68,14 @@ def git_repo(tmp_path_factory): return (directory, repo) +def assert_repo_is_closed(bundle: GitDagBundle): + # cat-file processes get left around if the repo is not closed, so check it was + assert bundle.repo.git.cat_file_all is None + assert bundle.bare_repo.git.cat_file_all is None + assert bundle.repo.git.cat_file_header is None + assert bundle.bare_repo.git.cat_file_header is None + + class TestGitDagBundle: @classmethod def teardown_class(cls) -> None: @@ -137,6 +145,8 @@ class TestGitDagBundle: assert bundle.get_current_version() == repo.head.commit.hexsha + assert_repo_is_closed(bundle) + @mock.patch("airflow.providers.git.bundles.git.GitHook") def test_get_specific_version(self, mock_githook, git_repo): repo_path, repo = git_repo @@ -163,6 +173,8 @@ class TestGitDagBundle: files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()} assert {"test_dag.py"} == files_in_repo + assert_repo_is_closed(bundle) + @mock.patch("airflow.providers.git.bundles.git.GitHook") def test_get_tag_version(self, mock_githook, git_repo): repo_path, repo = git_repo @@ -211,6 +223,8 @@ class TestGitDagBundle: files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()} assert {"test_dag.py", "new_test.py"} == files_in_repo + assert_repo_is_closed(bundle) + @pytest.mark.parametrize( "amend", [ @@ -250,6 +264,8 @@ class TestGitDagBundle: files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()} assert {"test_dag.py", "new_test.py"} == files_in_repo + assert_repo_is_closed(bundle) + @mock.patch("airflow.providers.git.bundles.git.GitHook") def test_refresh_tag(self, mock_githook, git_repo): """Ensure that the bundle refresh works when tracking a tag"""