This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 532069332c2 Fix corrupted bare Git repository recovery in DAG bundles
(#56206)
532069332c2 is described below
commit 532069332c22b5d5217fc871aa0b93dcd87a9873
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Thu Oct 16 07:05:08 2025 -0500
Fix corrupted bare Git repository recovery in DAG bundles (#56206)
* Fix corrupted bare Git repository recovery in DAG bundles
When using git DAG bundles, corrupted bare repositories can cause all
tasks
landing on a host to fail with InvalidGitRepositoryError. This adds retry
logic that detects corrupted bare repositories, cleans them up, and
attempts
to re-clone them once before failing.
Changes:
- Add InvalidGitRepositoryError handling in _clone_bare_repo_if_required()
- Implement cleanup and retry logic with shutil.rmtree()
- Add comprehensive tests for both successful retry and retry failure
scenarios
- Ensure all existing tests continue to pass
* Refactor git clone retry logic to use tenacity
* Ephraims suggestions
---
.../git/src/airflow/providers/git/bundles/git.py | 79 ++++++++++++++++------
providers/git/tests/unit/git/bundles/test_git.py | 66 +++++++++++++++++-
2 files changed, 124 insertions(+), 21 deletions(-)
diff --git a/providers/git/src/airflow/providers/git/bundles/git.py
b/providers/git/src/airflow/providers/git/bundles/git.py
index bd8dba47310..89c7f7af084 100644
--- a/providers/git/src/airflow/providers/git/bundles/git.py
+++ b/providers/git/src/airflow/providers/git/bundles/git.py
@@ -17,13 +17,15 @@
from __future__ import annotations
import os
+import shutil
from contextlib import nullcontext
from pathlib import Path
from urllib.parse import urlparse
import structlog
from git import Repo
-from git.exc import BadName, GitCommandError, NoSuchPathError
+from git.exc import BadName, GitCommandError, InvalidGitRepositoryError,
NoSuchPathError
+from tenacity import retry, retry_if_exception_type, stop_after_attempt
from airflow.dag_processing.bundles.base import BaseDagBundle
from airflow.exceptions import AirflowException
@@ -91,11 +93,21 @@ class GitDagBundle(BaseDagBundle):
with self.lock():
cm = self.hook.configure_hook_env() if self.hook else nullcontext()
with cm:
- self._clone_bare_repo_if_required()
+ try:
+ self._clone_bare_repo_if_required()
+ except GitCommandError as e:
+ raise RuntimeError("Error cloning repository") from e
+ except InvalidGitRepositoryError as e:
+ raise RuntimeError(f"Invalid git repository at
{self.bare_repo_path}") from e
self._ensure_version_in_bare_repo()
self.bare_repo.close()
- self._clone_repo_if_required()
+ try:
+ self._clone_repo_if_required()
+ except GitCommandError as e:
+ raise RuntimeError("Error cloning repository") from e
+ except InvalidGitRepositoryError as e:
+ raise RuntimeError(f"Invalid git repository at
{self.repo_path}") from e
self.repo.git.checkout(self.tracking_ref)
self._log.debug("bundle initialize", version=self.version)
if self.version:
@@ -113,36 +125,65 @@ class GitDagBundle(BaseDagBundle):
self._initialize()
super().initialize()
+ @retry(
+ retry=retry_if_exception_type((InvalidGitRepositoryError,
GitCommandError)),
+ stop=stop_after_attempt(2),
+ reraise=True,
+ )
def _clone_repo_if_required(self) -> None:
- if not os.path.exists(self.repo_path):
- self._log.info("Cloning repository", repo_path=self.repo_path,
bare_repo_path=self.bare_repo_path)
- try:
+ try:
+ if not os.path.exists(self.repo_path):
+ self._log.info(
+ "Cloning repository", repo_path=self.repo_path,
bare_repo_path=self.bare_repo_path
+ )
Repo.clone_from(
url=self.bare_repo_path,
to_path=self.repo_path,
)
- except NoSuchPathError as e:
- # Protection should the bare repo be removed manually
- raise AirflowException("Repository path: %s not found",
self.bare_repo_path) from e
- else:
- self._log.debug("repo exists", repo_path=self.repo_path)
- self.repo = Repo(self.repo_path)
-
+ else:
+ self._log.debug("repo exists", repo_path=self.repo_path)
+ self.repo = Repo(self.repo_path)
+ except NoSuchPathError as e:
+ # Protection should the bare repo be removed manually
+ raise AirflowException("Repository path: %s not found",
self.bare_repo_path) from e
+ except (InvalidGitRepositoryError, GitCommandError) as e:
+ self._log.warning(
+ "Repository clone/open failed, cleaning up and retrying",
+ repo_path=self.repo_path,
+ exc=e,
+ )
+ if os.path.exists(self.repo_path):
+ shutil.rmtree(self.repo_path)
+ raise
+
+ @retry(
+ retry=retry_if_exception_type((InvalidGitRepositoryError,
GitCommandError)),
+ stop=stop_after_attempt(2),
+ reraise=True,
+ )
def _clone_bare_repo_if_required(self) -> None:
if not self.repo_url:
raise AirflowException(f"Connection {self.git_conn_id} doesn't
have a host url")
- if not os.path.exists(self.bare_repo_path):
- self._log.info("Cloning bare repository",
bare_repo_path=self.bare_repo_path)
- try:
+
+ try:
+ if not os.path.exists(self.bare_repo_path):
+ self._log.info("Cloning bare repository",
bare_repo_path=self.bare_repo_path)
Repo.clone_from(
url=self.repo_url,
to_path=self.bare_repo_path,
bare=True,
env=self.hook.env if self.hook else None,
)
- except GitCommandError as e:
- raise AirflowException("Error cloning repository") from e
- self.bare_repo = Repo(self.bare_repo_path)
+ self.bare_repo = Repo(self.bare_repo_path)
+ except (InvalidGitRepositoryError, GitCommandError) as e:
+ self._log.warning(
+ "Bare repository clone/open failed, cleaning up and retrying",
+ bare_repo_path=self.bare_repo_path,
+ exc=e,
+ )
+ if os.path.exists(self.bare_repo_path):
+ shutil.rmtree(self.bare_repo_path)
+ raise
def _ensure_version_in_bare_repo(self) -> None:
if not self.version:
diff --git a/providers/git/tests/unit/git/bundles/test_git.py
b/providers/git/tests/unit/git/bundles/test_git.py
index 950388f6d6c..cf85e71758b 100644
--- a/providers/git/tests/unit/git/bundles/test_git.py
+++ b/providers/git/tests/unit/git/bundles/test_git.py
@@ -26,7 +26,7 @@ from unittest.mock import patch
import pytest
from git import Repo
-from git.exc import GitCommandError, NoSuchPathError
+from git.exc import GitCommandError, InvalidGitRepositoryError, NoSuchPathError
from airflow.dag_processing.bundles.base import get_bundle_storage_root_path
from airflow.exceptions import AirflowException
@@ -653,7 +653,7 @@ class TestGitDagBundle:
mock_clone.side_effect = GitCommandError("clone", "Simulated
error")
bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS,
tracking_ref="main")
with pytest.raises(
- AirflowException,
+ RuntimeError,
match=re.escape("Error cloning repository"),
):
bundle.initialize()
@@ -745,3 +745,65 @@ class TestGitDagBundle:
bundle._clone_bare_repo_if_required()
_, kwargs = mock_gitRepo.clone_from.call_args
assert kwargs["env"] == EXPECTED_ENV
+
+ @mock.patch("airflow.providers.git.bundles.git.GitHook")
+ @mock.patch("airflow.providers.git.bundles.git.shutil.rmtree")
+ @mock.patch("airflow.providers.git.bundles.git.os.path.exists")
+ def test_clone_bare_repo_invalid_repository_error_retry(self, mock_exists,
mock_rmtree, mock_githook):
+ """Test that InvalidGitRepositoryError triggers cleanup and retry."""
+ mock_githook.return_value.repo_url =
"[email protected]:apache/airflow.git"
+ mock_githook.return_value.env = {}
+
+ # Set up exists to return True for the bare repo path (simulating
corrupted repo exists)
+ mock_exists.return_value = True
+
+ with mock.patch("airflow.providers.git.bundles.git.Repo") as
mock_repo_class:
+ # First call to Repo() raises InvalidGitRepositoryError, second
call succeeds
+ mock_repo_class.side_effect = [
+ InvalidGitRepositoryError("Invalid git repository"),
+ mock.MagicMock(), # Second attempt succeeds
+ ]
+
+ # Mock successful clone_from for the retry attempt
+ mock_repo_class.clone_from = mock.MagicMock()
+
+ bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS,
tracking_ref="main")
+
+ # This should not raise an exception due to retry logic
+ bundle._clone_bare_repo_if_required()
+
+ # Verify cleanup was called
+ mock_rmtree.assert_called_once_with(bundle.bare_repo_path)
+
+ # Verify Repo was called twice (failed attempt + retry)
+ assert mock_repo_class.call_count == 2
+
+ @mock.patch("airflow.providers.git.bundles.git.GitHook")
+ @mock.patch("airflow.providers.git.bundles.git.shutil.rmtree")
+ @mock.patch("airflow.providers.git.bundles.git.os.path.exists")
+ def test_clone_bare_repo_invalid_repository_error_retry_fails(
+ self, mock_exists, mock_rmtree, mock_githook
+ ):
+ """Test that InvalidGitRepositoryError after retry is re-raised
(wrapped in AirflowException by caller)."""
+ mock_githook.return_value.repo_url =
"[email protected]:apache/airflow.git"
+ mock_githook.return_value.env = {}
+
+ # Set up exists to return True for the bare repo path
+ mock_exists.return_value = True
+
+ with mock.patch("airflow.providers.git.bundles.git.Repo") as
mock_repo_class:
+ # Both calls to Repo() raise InvalidGitRepositoryError
+ mock_repo_class.side_effect = InvalidGitRepositoryError("Invalid
git repository")
+
+ bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS,
tracking_ref="main")
+
+ # The raw exception is raised by the method itself, but wrapped by
_initialize
+ with pytest.raises(InvalidGitRepositoryError, match="Invalid git
repository"):
+ bundle._clone_bare_repo_if_required()
+
+ # Verify cleanup was called twice (once for each failed attempt)
+ assert mock_rmtree.call_count == 2
+ mock_rmtree.assert_called_with(bundle.bare_repo_path)
+
+ # Verify Repo was called twice (failed attempt + failed retry)
+ assert mock_repo_class.call_count == 2