This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f998c7c65d FIX: Allow override of HTTP method in run_method of 
LivyAsyncHook (#64150)
6f998c7c65d is described below

commit 6f998c7c65d835ab31454159ae31ff0523a8db7b
Author: David Blain <[email protected]>
AuthorDate: Tue Mar 24 17:52:36 2026 +0100

    FIX: Allow override of HTTP method in run_method of LivyAsyncHook (#64150)
    
    * refactor: Allow override the method in run_method of LivyAsyncHook
    
    * refactor: Allow override the method in run_method of LivyAsyncHook
    
    * refactor: Removed duplicate test_run_method_success as it is the same as 
test_run_get_method_with_success
---
 .../airflow/providers/apache/livy/hooks/livy.py    |  2 +-
 .../livy/tests/unit/apache/livy/hooks/test_livy.py | 27 ++--------------
 .../http/src/airflow/providers/http/hooks/http.py  | 37 ++++++++++++----------
 providers/http/tests/unit/http/hooks/test_http.py  | 16 ++++++++++
 4 files changed, 39 insertions(+), 43 deletions(-)

diff --git 
a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py 
b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py
index e9f8e94c748..0b6b5c5c01b 100644
--- a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py
+++ b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py
@@ -523,7 +523,7 @@ class LivyAsyncHook(HttpAsyncHook):
         )
 
         try:
-            async with self.session() as session:
+            async with self.session(method=method) as session:
                 response = await session.run(
                     endpoint=endpoint,
                     data=data,
diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py 
b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py
index 90bdcad8866..3bd7e1fafb6 100644
--- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py
+++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py
@@ -591,27 +591,6 @@ class TestLivyAsyncHook:
         log_dump = await hook.dump_batch_logs(BATCH_ID)
         assert log_dump == {"id": 1, "log": ["mock_log_1", "mock_log_2"]}
 
-    @pytest.mark.asyncio
-    @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession")
-    @mock.patch(
-        "airflow.providers.common.compat.connection.get_async_connection",
-        return_value=Connection(
-            conn_id=LIVY_CONN_ID,
-            conn_type="http",
-            host="http://host";,
-            port=80,
-        ),
-    )
-    async def test_run_method_success(self, mock_get_connection, mock_session):
-        """Asserts the run_method for success response."""
-        mock_session.return_value.__aenter__.return_value.post = AsyncMock()
-        
mock_session.return_value.__aenter__.return_value.post.return_value.json = 
AsyncMock(
-            return_value={"id": 1}
-        )
-        hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID)
-        response = await hook.run_method("localhost", "GET")
-        assert response == {"status": "success", "response": {"id": 1}}
-
     @pytest.mark.asyncio
     async def test_run_method_error(self):
         """Asserts the run_method for error response."""
@@ -659,8 +638,7 @@ class TestLivyAsyncHook:
             return_value={"hello": "world"}
         )
         hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID)
-        hook.method = "GET"
-        response = await hook.run_method("api/jobs/runs/get")
+        response = await hook.run_method("api/jobs/runs/get", method="GET")
         assert response["status"] == "success"
         assert response["response"] == {"hello": "world"}
 
@@ -682,8 +660,7 @@ class TestLivyAsyncHook:
             return_value={"hello": "world"}
         )
         hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID)
-        hook.method = "PATCH"
-        response = await hook.run_method("api/jobs/runs/get")
+        response = await hook.run_method("api/jobs/runs/get", method="PATCH")
         assert response["status"] == "success"
         assert response["response"] == {"hello": "world"}
 
diff --git a/providers/http/src/airflow/providers/http/hooks/http.py 
b/providers/http/src/airflow/providers/http/hooks/http.py
index 240bd24d06c..3d22867f265 100644
--- a/providers/http/src/airflow/providers/http/hooks/http.py
+++ b/providers/http/src/airflow/providers/http/hooks/http.py
@@ -453,8 +453,10 @@ class AsyncHttpSession(LoggingMixin):
         hook: HttpAsyncHook,
         request: Callable[..., Awaitable[ClientResponse]],
         config: SessionConfig,
+        method: str | None = None,
     ) -> None:
         super().__init__()
+        self.method = method or hook.method
         self._hook = hook
         self._request = request
         self.config = config
@@ -467,10 +469,6 @@ class AsyncHttpSession(LoggingMixin):
     def base_url(self) -> str:
         return self.config.base_url
 
-    @property
-    def method(self) -> str:
-        return self._hook.method
-
     @property
     def retry_limit(self) -> int:
         return self._hook.retry_limit
@@ -587,23 +585,25 @@ class HttpAsyncHook(BaseHook):
         self.retry_delay = retry_delay
         self._config: SessionConfig | None = None
 
-    def _get_request_func(self, session: aiohttp.ClientSession) -> 
Callable[..., Any]:
-        method = self.method
-        if method == "GET":
+    def _get_request_func(
+        self, session: aiohttp.ClientSession, method: str | None = None
+    ) -> Callable[..., Any]:
+        http_method = method or self.method
+        if http_method == "GET":
             return session.get
-        if method == "POST":
+        if http_method == "POST":
             return session.post
-        if method == "PATCH":
+        if http_method == "PATCH":
             return session.patch
-        if method == "HEAD":
+        if http_method == "HEAD":
             return session.head
-        if method == "PUT":
+        if http_method == "PUT":
             return session.put
-        if method == "DELETE":
+        if http_method == "DELETE":
             return session.delete
-        if method == "OPTIONS":
+        if http_method == "OPTIONS":
             return session.options
-        raise HttpMethodException(f"Unexpected HTTP Method: {method}")
+        raise HttpMethodException(f"Unexpected HTTP Method: {http_method}")
 
     async def config(self) -> SessionConfig:
         if not self._config:
@@ -644,16 +644,19 @@ class HttpAsyncHook(BaseHook):
         return self._config
 
     @asynccontextmanager
-    async def session(self) -> AsyncGenerator[AsyncHttpSession, None]:
+    async def session(self, method: str | None = None) -> 
AsyncGenerator[AsyncHttpSession, None]:
         """
         Create an ``AsyncHttpSession`` bound to a single 
``aiohttp.ClientSession``.
 
         Airflow connection resolution happens exactly once here.
+
+        :param method: Optional HTTP method to be used for requests made by 
the returned session.
+            If provided, this value overrides the hook's configured default 
method.
         """
         async with aiohttp.ClientSession() as session:
-            request = self._get_request_func(session=session)
+            request = self._get_request_func(session=session, method=method)
             config = await self.config()
-            yield AsyncHttpSession(hook=self, request=request, config=config)
+            yield AsyncHttpSession(hook=self, request=request, config=config, 
method=method)
 
     async def run(
         self,
diff --git a/providers/http/tests/unit/http/hooks/test_http.py 
b/providers/http/tests/unit/http/hooks/test_http.py
index 7bf3ed1f5b9..360c2383930 100644
--- a/providers/http/tests/unit/http/hooks/test_http.py
+++ b/providers/http/tests/unit/http/hooks/test_http.py
@@ -738,6 +738,22 @@ class TestHttpAsyncHook:
             async with aiohttp.ClientSession() as session:
                 await hook.run(session=session, 
endpoint="non_existent_endpoint", data=json)
 
+    @pytest.mark.asyncio
+    async def test_async_get_request(self):
+        """Test api call asynchronously for POST request."""
+        hook = HttpAsyncHook()
+
+        with aioresponses() as m:
+            m.get(
+                "http://test:8080/v1/test";,
+                status=200,
+                payload='{"status":{"status": 200}}',
+                reason="OK",
+            )
+            async with hook.session(method="GET") as session:
+                resp = await session.run(endpoint="v1/test")
+                assert resp.status == 200
+
     @pytest.mark.asyncio
     async def test_async_post_request(self):
         """Test api call asynchronously for POST request."""

Reply via email to