This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6f998c7c65d FIX: Allow override of HTTP method in run_method of
LivyAsyncHook (#64150)
6f998c7c65d is described below
commit 6f998c7c65d835ab31454159ae31ff0523a8db7b
Author: David Blain <[email protected]>
AuthorDate: Tue Mar 24 17:52:36 2026 +0100
FIX: Allow override of HTTP method in run_method of LivyAsyncHook (#64150)
* refactor: Allow override the method in run_method of LivyAsyncHook
* refactor: Allow override the method in run_method of LivyAsyncHook
* refactor: Removed duplicate test_run_method_success as it is the same as
test_run_get_method_with_success
---
.../airflow/providers/apache/livy/hooks/livy.py | 2 +-
.../livy/tests/unit/apache/livy/hooks/test_livy.py | 27 ++--------------
.../http/src/airflow/providers/http/hooks/http.py | 37 ++++++++++++----------
providers/http/tests/unit/http/hooks/test_http.py | 16 ++++++++++
4 files changed, 39 insertions(+), 43 deletions(-)
diff --git
a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py
b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py
index e9f8e94c748..0b6b5c5c01b 100644
--- a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py
+++ b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py
@@ -523,7 +523,7 @@ class LivyAsyncHook(HttpAsyncHook):
)
try:
- async with self.session() as session:
+ async with self.session(method=method) as session:
response = await session.run(
endpoint=endpoint,
data=data,
diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py
b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py
index 90bdcad8866..3bd7e1fafb6 100644
--- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py
+++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py
@@ -591,27 +591,6 @@ class TestLivyAsyncHook:
log_dump = await hook.dump_batch_logs(BATCH_ID)
assert log_dump == {"id": 1, "log": ["mock_log_1", "mock_log_2"]}
- @pytest.mark.asyncio
- @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession")
- @mock.patch(
- "airflow.providers.common.compat.connection.get_async_connection",
- return_value=Connection(
- conn_id=LIVY_CONN_ID,
- conn_type="http",
- host="http://host",
- port=80,
- ),
- )
- async def test_run_method_success(self, mock_get_connection, mock_session):
- """Asserts the run_method for success response."""
- mock_session.return_value.__aenter__.return_value.post = AsyncMock()
-
mock_session.return_value.__aenter__.return_value.post.return_value.json =
AsyncMock(
- return_value={"id": 1}
- )
- hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID)
- response = await hook.run_method("localhost", "GET")
- assert response == {"status": "success", "response": {"id": 1}}
-
@pytest.mark.asyncio
async def test_run_method_error(self):
"""Asserts the run_method for error response."""
@@ -659,8 +638,7 @@ class TestLivyAsyncHook:
return_value={"hello": "world"}
)
hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID)
- hook.method = "GET"
- response = await hook.run_method("api/jobs/runs/get")
+ response = await hook.run_method("api/jobs/runs/get", method="GET")
assert response["status"] == "success"
assert response["response"] == {"hello": "world"}
@@ -682,8 +660,7 @@ class TestLivyAsyncHook:
return_value={"hello": "world"}
)
hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID)
- hook.method = "PATCH"
- response = await hook.run_method("api/jobs/runs/get")
+ response = await hook.run_method("api/jobs/runs/get", method="PATCH")
assert response["status"] == "success"
assert response["response"] == {"hello": "world"}
diff --git a/providers/http/src/airflow/providers/http/hooks/http.py
b/providers/http/src/airflow/providers/http/hooks/http.py
index 240bd24d06c..3d22867f265 100644
--- a/providers/http/src/airflow/providers/http/hooks/http.py
+++ b/providers/http/src/airflow/providers/http/hooks/http.py
@@ -453,8 +453,10 @@ class AsyncHttpSession(LoggingMixin):
hook: HttpAsyncHook,
request: Callable[..., Awaitable[ClientResponse]],
config: SessionConfig,
+ method: str | None = None,
) -> None:
super().__init__()
+ self.method = method or hook.method
self._hook = hook
self._request = request
self.config = config
@@ -467,10 +469,6 @@ class AsyncHttpSession(LoggingMixin):
def base_url(self) -> str:
return self.config.base_url
- @property
- def method(self) -> str:
- return self._hook.method
-
@property
def retry_limit(self) -> int:
return self._hook.retry_limit
@@ -587,23 +585,25 @@ class HttpAsyncHook(BaseHook):
self.retry_delay = retry_delay
self._config: SessionConfig | None = None
- def _get_request_func(self, session: aiohttp.ClientSession) ->
Callable[..., Any]:
- method = self.method
- if method == "GET":
+ def _get_request_func(
+ self, session: aiohttp.ClientSession, method: str | None = None
+ ) -> Callable[..., Any]:
+ http_method = method or self.method
+ if http_method == "GET":
return session.get
- if method == "POST":
+ if http_method == "POST":
return session.post
- if method == "PATCH":
+ if http_method == "PATCH":
return session.patch
- if method == "HEAD":
+ if http_method == "HEAD":
return session.head
- if method == "PUT":
+ if http_method == "PUT":
return session.put
- if method == "DELETE":
+ if http_method == "DELETE":
return session.delete
- if method == "OPTIONS":
+ if http_method == "OPTIONS":
return session.options
- raise HttpMethodException(f"Unexpected HTTP Method: {method}")
+ raise HttpMethodException(f"Unexpected HTTP Method: {http_method}")
async def config(self) -> SessionConfig:
if not self._config:
@@ -644,16 +644,19 @@ class HttpAsyncHook(BaseHook):
return self._config
@asynccontextmanager
- async def session(self) -> AsyncGenerator[AsyncHttpSession, None]:
+ async def session(self, method: str | None = None) ->
AsyncGenerator[AsyncHttpSession, None]:
"""
Create an ``AsyncHttpSession`` bound to a single
``aiohttp.ClientSession``.
Airflow connection resolution happens exactly once here.
+
+ :param method: Optional HTTP method to be used for requests made by
the returned session.
+ If provided, this value overrides the hook's configured default
method.
"""
async with aiohttp.ClientSession() as session:
- request = self._get_request_func(session=session)
+ request = self._get_request_func(session=session, method=method)
config = await self.config()
- yield AsyncHttpSession(hook=self, request=request, config=config)
+ yield AsyncHttpSession(hook=self, request=request, config=config,
method=method)
async def run(
self,
diff --git a/providers/http/tests/unit/http/hooks/test_http.py
b/providers/http/tests/unit/http/hooks/test_http.py
index 7bf3ed1f5b9..360c2383930 100644
--- a/providers/http/tests/unit/http/hooks/test_http.py
+++ b/providers/http/tests/unit/http/hooks/test_http.py
@@ -738,6 +738,22 @@ class TestHttpAsyncHook:
async with aiohttp.ClientSession() as session:
await hook.run(session=session,
endpoint="non_existent_endpoint", data=json)
+ @pytest.mark.asyncio
+ async def test_async_get_request(self):
+ """Test api call asynchronously for POST request."""
+ hook = HttpAsyncHook()
+
+ with aioresponses() as m:
+ m.get(
+ "http://test:8080/v1/test",
+ status=200,
+ payload='{"status":{"status": 200}}',
+ reason="OK",
+ )
+ async with hook.session(method="GET") as session:
+ resp = await session.run(endpoint="v1/test")
+ assert resp.status == 200
+
@pytest.mark.asyncio
async def test_async_post_request(self):
"""Test api call asynchronously for POST request."""