This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 532069332c2 Fix corrupted bare Git repository recovery in DAG bundles 
(#56206)
532069332c2 is described below

commit 532069332c22b5d5217fc871aa0b93dcd87a9873
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Thu Oct 16 07:05:08 2025 -0500

    Fix corrupted bare Git repository recovery in DAG bundles (#56206)
    
    * Fix corrupted bare Git repository recovery in DAG bundles
    
      When using git DAG bundles, corrupted bare repositories can cause all 
tasks
      landing on a host to fail with InvalidGitRepositoryError. This adds retry
      logic that detects corrupted bare repositories, cleans them up, and 
attempts
      to re-clone them once before failing.
    
      Changes:
      - Add InvalidGitRepositoryError handling in _clone_bare_repo_if_required()
      - Implement cleanup and retry logic with shutil.rmtree()
      - Add comprehensive tests for both successful retry and retry failure 
scenarios
      - Ensure all existing tests continue to pass
    
    * Refactor git clone retry logic to use tenacity
    
    * Ephraims suggestions
---
 .../git/src/airflow/providers/git/bundles/git.py   | 79 ++++++++++++++++------
 providers/git/tests/unit/git/bundles/test_git.py   | 66 +++++++++++++++++-
 2 files changed, 124 insertions(+), 21 deletions(-)

diff --git a/providers/git/src/airflow/providers/git/bundles/git.py 
b/providers/git/src/airflow/providers/git/bundles/git.py
index bd8dba47310..89c7f7af084 100644
--- a/providers/git/src/airflow/providers/git/bundles/git.py
+++ b/providers/git/src/airflow/providers/git/bundles/git.py
@@ -17,13 +17,15 @@
 from __future__ import annotations
 
 import os
+import shutil
 from contextlib import nullcontext
 from pathlib import Path
 from urllib.parse import urlparse
 
 import structlog
 from git import Repo
-from git.exc import BadName, GitCommandError, NoSuchPathError
+from git.exc import BadName, GitCommandError, InvalidGitRepositoryError, 
NoSuchPathError
+from tenacity import retry, retry_if_exception_type, stop_after_attempt
 
 from airflow.dag_processing.bundles.base import BaseDagBundle
 from airflow.exceptions import AirflowException
@@ -91,11 +93,21 @@ class GitDagBundle(BaseDagBundle):
         with self.lock():
             cm = self.hook.configure_hook_env() if self.hook else nullcontext()
             with cm:
-                self._clone_bare_repo_if_required()
+                try:
+                    self._clone_bare_repo_if_required()
+                except GitCommandError as e:
+                    raise RuntimeError("Error cloning repository") from e
+                except InvalidGitRepositoryError as e:
+                    raise RuntimeError(f"Invalid git repository at 
{self.bare_repo_path}") from e
                 self._ensure_version_in_bare_repo()
             self.bare_repo.close()
 
-            self._clone_repo_if_required()
+            try:
+                self._clone_repo_if_required()
+            except GitCommandError as e:
+                raise RuntimeError("Error cloning repository") from e
+            except InvalidGitRepositoryError as e:
+                raise RuntimeError(f"Invalid git repository at 
{self.repo_path}") from e
             self.repo.git.checkout(self.tracking_ref)
             self._log.debug("bundle initialize", version=self.version)
             if self.version:
@@ -113,36 +125,65 @@ class GitDagBundle(BaseDagBundle):
         self._initialize()
         super().initialize()
 
+    @retry(
+        retry=retry_if_exception_type((InvalidGitRepositoryError, 
GitCommandError)),
+        stop=stop_after_attempt(2),
+        reraise=True,
+    )
     def _clone_repo_if_required(self) -> None:
-        if not os.path.exists(self.repo_path):
-            self._log.info("Cloning repository", repo_path=self.repo_path, 
bare_repo_path=self.bare_repo_path)
-            try:
+        try:
+            if not os.path.exists(self.repo_path):
+                self._log.info(
+                    "Cloning repository", repo_path=self.repo_path, 
bare_repo_path=self.bare_repo_path
+                )
                 Repo.clone_from(
                     url=self.bare_repo_path,
                     to_path=self.repo_path,
                 )
-            except NoSuchPathError as e:
-                # Protection should the bare repo be removed manually
-                raise AirflowException("Repository path: %s not found", 
self.bare_repo_path) from e
-        else:
-            self._log.debug("repo exists", repo_path=self.repo_path)
-        self.repo = Repo(self.repo_path)
-
+            else:
+                self._log.debug("repo exists", repo_path=self.repo_path)
+            self.repo = Repo(self.repo_path)
+        except NoSuchPathError as e:
+            # Protection should the bare repo be removed manually
+            raise AirflowException("Repository path: %s not found", 
self.bare_repo_path) from e
+        except (InvalidGitRepositoryError, GitCommandError) as e:
+            self._log.warning(
+                "Repository clone/open failed, cleaning up and retrying",
+                repo_path=self.repo_path,
+                exc=e,
+            )
+            if os.path.exists(self.repo_path):
+                shutil.rmtree(self.repo_path)
+            raise
+
+    @retry(
+        retry=retry_if_exception_type((InvalidGitRepositoryError, 
GitCommandError)),
+        stop=stop_after_attempt(2),
+        reraise=True,
+    )
     def _clone_bare_repo_if_required(self) -> None:
         if not self.repo_url:
             raise AirflowException(f"Connection {self.git_conn_id} doesn't 
have a host url")
-        if not os.path.exists(self.bare_repo_path):
-            self._log.info("Cloning bare repository", 
bare_repo_path=self.bare_repo_path)
-            try:
+
+        try:
+            if not os.path.exists(self.bare_repo_path):
+                self._log.info("Cloning bare repository", 
bare_repo_path=self.bare_repo_path)
                 Repo.clone_from(
                     url=self.repo_url,
                     to_path=self.bare_repo_path,
                     bare=True,
                     env=self.hook.env if self.hook else None,
                 )
-            except GitCommandError as e:
-                raise AirflowException("Error cloning repository") from e
-        self.bare_repo = Repo(self.bare_repo_path)
+            self.bare_repo = Repo(self.bare_repo_path)
+        except (InvalidGitRepositoryError, GitCommandError) as e:
+            self._log.warning(
+                "Bare repository clone/open failed, cleaning up and retrying",
+                bare_repo_path=self.bare_repo_path,
+                exc=e,
+            )
+            if os.path.exists(self.bare_repo_path):
+                shutil.rmtree(self.bare_repo_path)
+            raise
 
     def _ensure_version_in_bare_repo(self) -> None:
         if not self.version:
diff --git a/providers/git/tests/unit/git/bundles/test_git.py 
b/providers/git/tests/unit/git/bundles/test_git.py
index 950388f6d6c..cf85e71758b 100644
--- a/providers/git/tests/unit/git/bundles/test_git.py
+++ b/providers/git/tests/unit/git/bundles/test_git.py
@@ -26,7 +26,7 @@ from unittest.mock import patch
 
 import pytest
 from git import Repo
-from git.exc import GitCommandError, NoSuchPathError
+from git.exc import GitCommandError, InvalidGitRepositoryError, NoSuchPathError
 
 from airflow.dag_processing.bundles.base import get_bundle_storage_root_path
 from airflow.exceptions import AirflowException
@@ -653,7 +653,7 @@ class TestGitDagBundle:
             mock_clone.side_effect = GitCommandError("clone", "Simulated 
error")
             bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, 
tracking_ref="main")
             with pytest.raises(
-                AirflowException,
+                RuntimeError,
                 match=re.escape("Error cloning repository"),
             ):
                 bundle.initialize()
@@ -745,3 +745,65 @@ class TestGitDagBundle:
             bundle._clone_bare_repo_if_required()
             _, kwargs = mock_gitRepo.clone_from.call_args
             assert kwargs["env"] == EXPECTED_ENV
+
+    @mock.patch("airflow.providers.git.bundles.git.GitHook")
+    @mock.patch("airflow.providers.git.bundles.git.shutil.rmtree")
+    @mock.patch("airflow.providers.git.bundles.git.os.path.exists")
+    def test_clone_bare_repo_invalid_repository_error_retry(self, mock_exists, 
mock_rmtree, mock_githook):
+        """Test that InvalidGitRepositoryError triggers cleanup and retry."""
+        mock_githook.return_value.repo_url = 
"[email protected]:apache/airflow.git"
+        mock_githook.return_value.env = {}
+
+        # Set up exists to return True for the bare repo path (simulating 
corrupted repo exists)
+        mock_exists.return_value = True
+
+        with mock.patch("airflow.providers.git.bundles.git.Repo") as 
mock_repo_class:
+            # First call to Repo() raises InvalidGitRepositoryError, second 
call succeeds
+            mock_repo_class.side_effect = [
+                InvalidGitRepositoryError("Invalid git repository"),
+                mock.MagicMock(),  # Second attempt succeeds
+            ]
+
+            # Mock successful clone_from for the retry attempt
+            mock_repo_class.clone_from = mock.MagicMock()
+
+            bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, 
tracking_ref="main")
+
+            # This should not raise an exception due to retry logic
+            bundle._clone_bare_repo_if_required()
+
+            # Verify cleanup was called
+            mock_rmtree.assert_called_once_with(bundle.bare_repo_path)
+
+            # Verify Repo was called twice (failed attempt + retry)
+            assert mock_repo_class.call_count == 2
+
+    @mock.patch("airflow.providers.git.bundles.git.GitHook")
+    @mock.patch("airflow.providers.git.bundles.git.shutil.rmtree")
+    @mock.patch("airflow.providers.git.bundles.git.os.path.exists")
+    def test_clone_bare_repo_invalid_repository_error_retry_fails(
+        self, mock_exists, mock_rmtree, mock_githook
+    ):
+        """Test that InvalidGitRepositoryError after retry is re-raised 
(wrapped in AirflowException by caller)."""
+        mock_githook.return_value.repo_url = 
"[email protected]:apache/airflow.git"
+        mock_githook.return_value.env = {}
+
+        # Set up exists to return True for the bare repo path
+        mock_exists.return_value = True
+
+        with mock.patch("airflow.providers.git.bundles.git.Repo") as 
mock_repo_class:
+            # Both calls to Repo() raise InvalidGitRepositoryError
+            mock_repo_class.side_effect = InvalidGitRepositoryError("Invalid 
git repository")
+
+            bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, 
tracking_ref="main")
+
+            # The raw exception is raised by the method itself, but wrapped by 
_initialize
+            with pytest.raises(InvalidGitRepositoryError, match="Invalid git 
repository"):
+                bundle._clone_bare_repo_if_required()
+
+            # Verify cleanup was called twice (once for each failed attempt)
+            assert mock_rmtree.call_count == 2
+            mock_rmtree.assert_called_with(bundle.bare_repo_path)
+
+            # Verify Repo was called twice (failed attempt + failed retry)
+            assert mock_repo_class.call_count == 2

Reply via email to