Lee-W commented on code in PR #45228:
URL: https://github.com/apache/airflow/pull/45228#discussion_r1900488657
##########
providers/src/airflow/providers/http/hooks/http.py:
##########
@@ -410,54 +411,53 @@ async def run(
url = _url_from_endpoint(self.base_url, endpoint)
- async with aiohttp.ClientSession() as session:
- if self.method == "GET":
- request_func = session.get
- elif self.method == "POST":
- request_func = session.post
- elif self.method == "PATCH":
- request_func = session.patch
- elif self.method == "HEAD":
- request_func = session.head
- elif self.method == "PUT":
- request_func = session.put
- elif self.method == "DELETE":
- request_func = session.delete
- elif self.method == "OPTIONS":
- request_func = session.options
- else:
- raise AirflowException(f"Unexpected HTTP Method:
{self.method}")
-
- for attempt in range(1, 1 + self.retry_limit):
- response = await request_func(
+ if self.method == "GET":
+ request_func = session.get
+ elif self.method == "POST":
+ request_func = session.post
+ elif self.method == "PATCH":
+ request_func = session.patch
+ elif self.method == "HEAD":
+ request_func = session.head
+ elif self.method == "PUT":
+ request_func = session.put
+ elif self.method == "DELETE":
+ request_func = session.delete
+ elif self.method == "OPTIONS":
+ request_func = session.options
+ else:
+ raise AirflowException(f"Unexpected HTTP Method: {self.method}")
+
+ for attempt in range(1, 1 + self.retry_limit):
+ response = await request_func(
+ url,
+ params=data if self.method == "GET" else None,
+ data=data if self.method in ("POST", "PUT", "PATCH") else None,
+ json=json,
+ headers=_headers,
+ auth=auth,
+ **extra_options,
+ )
+ try:
+ response.raise_for_status()
+ except ClientResponseError as e:
+ self.log.warning(
+ "[Try %d of %d] Request to %s failed.",
+ attempt,
+ self.retry_limit,
url,
- params=data if self.method == "GET" else None,
- data=data if self.method in ("POST", "PUT", "PATCH") else
None,
- json=json,
- headers=_headers,
- auth=auth,
- **extra_options,
)
- try:
- response.raise_for_status()
- except ClientResponseError as e:
- self.log.warning(
- "[Try %d of %d] Request to %s failed.",
- attempt,
- self.retry_limit,
- url,
- )
- if not self._retryable_error_async(e) or attempt ==
self.retry_limit:
- self.log.exception("HTTP error with status: %s",
e.status)
- # In this case, the user probably made a mistake.
- # Don't retry.
- raise AirflowException(f"{e.status}:{e.message}")
- else:
- await asyncio.sleep(self.retry_delay)
+ if not self._retryable_error_async(e) or attempt ==
self.retry_limit:
+ self.log.exception("HTTP error with status: %s", e.status)
+ # In this case, the user probably made a mistake.
+ # Don't retry.
+ raise AirflowException(f"{e.status}:{e.message}")
Review Comment:
Same here
##########
providers/src/airflow/providers/http/hooks/http.py:
##########
@@ -410,54 +411,53 @@ async def run(
url = _url_from_endpoint(self.base_url, endpoint)
- async with aiohttp.ClientSession() as session:
- if self.method == "GET":
- request_func = session.get
- elif self.method == "POST":
- request_func = session.post
- elif self.method == "PATCH":
- request_func = session.patch
- elif self.method == "HEAD":
- request_func = session.head
- elif self.method == "PUT":
- request_func = session.put
- elif self.method == "DELETE":
- request_func = session.delete
- elif self.method == "OPTIONS":
- request_func = session.options
- else:
- raise AirflowException(f"Unexpected HTTP Method:
{self.method}")
-
- for attempt in range(1, 1 + self.retry_limit):
- response = await request_func(
+ if self.method == "GET":
+ request_func = session.get
+ elif self.method == "POST":
+ request_func = session.post
+ elif self.method == "PATCH":
+ request_func = session.patch
+ elif self.method == "HEAD":
+ request_func = session.head
+ elif self.method == "PUT":
+ request_func = session.put
+ elif self.method == "DELETE":
+ request_func = session.delete
+ elif self.method == "OPTIONS":
+ request_func = session.options
+ else:
+ raise AirflowException(f"Unexpected HTTP Method: {self.method}")
+
+ for attempt in range(1, 1 + self.retry_limit):
+ response = await request_func(
+ url,
+ params=data if self.method == "GET" else None,
+ data=data if self.method in ("POST", "PUT", "PATCH") else None,
+ json=json,
+ headers=_headers,
+ auth=auth,
+ **extra_options,
+ )
+ try:
+ response.raise_for_status()
+ except ClientResponseError as e:
+ self.log.warning(
+ "[Try %d of %d] Request to %s failed.",
+ attempt,
+ self.retry_limit,
url,
- params=data if self.method == "GET" else None,
- data=data if self.method in ("POST", "PUT", "PATCH") else
None,
- json=json,
- headers=_headers,
- auth=auth,
- **extra_options,
)
- try:
- response.raise_for_status()
- except ClientResponseError as e:
- self.log.warning(
- "[Try %d of %d] Request to %s failed.",
- attempt,
- self.retry_limit,
- url,
- )
- if not self._retryable_error_async(e) or attempt ==
self.retry_limit:
- self.log.exception("HTTP error with status: %s",
e.status)
- # In this case, the user probably made a mistake.
- # Don't retry.
- raise AirflowException(f"{e.status}:{e.message}")
- else:
- await asyncio.sleep(self.retry_delay)
+ if not self._retryable_error_async(e) or attempt ==
self.retry_limit:
+ self.log.exception("HTTP error with status: %s", e.status)
+ # In this case, the user probably made a mistake.
+ # Don't retry.
+ raise AirflowException(f"{e.status}:{e.message}")
else:
- return response
+ await asyncio.sleep(self.retry_delay)
Review Comment:
```suggestion
await asyncio.sleep(self.retry_delay)
```
##########
providers/src/airflow/providers/http/hooks/http.py:
##########
@@ -410,54 +411,53 @@ async def run(
url = _url_from_endpoint(self.base_url, endpoint)
- async with aiohttp.ClientSession() as session:
- if self.method == "GET":
- request_func = session.get
- elif self.method == "POST":
- request_func = session.post
- elif self.method == "PATCH":
- request_func = session.patch
- elif self.method == "HEAD":
- request_func = session.head
- elif self.method == "PUT":
- request_func = session.put
- elif self.method == "DELETE":
- request_func = session.delete
- elif self.method == "OPTIONS":
- request_func = session.options
- else:
- raise AirflowException(f"Unexpected HTTP Method:
{self.method}")
-
- for attempt in range(1, 1 + self.retry_limit):
- response = await request_func(
+ if self.method == "GET":
+ request_func = session.get
+ elif self.method == "POST":
+ request_func = session.post
+ elif self.method == "PATCH":
+ request_func = session.patch
+ elif self.method == "HEAD":
+ request_func = session.head
+ elif self.method == "PUT":
+ request_func = session.put
+ elif self.method == "DELETE":
+ request_func = session.delete
+ elif self.method == "OPTIONS":
+ request_func = session.options
+ else:
+ raise AirflowException(f"Unexpected HTTP Method: {self.method}")
+
+ for attempt in range(1, 1 + self.retry_limit):
+ response = await request_func(
+ url,
+ params=data if self.method == "GET" else None,
+ data=data if self.method in ("POST", "PUT", "PATCH") else None,
+ json=json,
+ headers=_headers,
+ auth=auth,
+ **extra_options,
+ )
+ try:
+ response.raise_for_status()
+ except ClientResponseError as e:
+ self.log.warning(
+ "[Try %d of %d] Request to %s failed.",
+ attempt,
+ self.retry_limit,
url,
- params=data if self.method == "GET" else None,
- data=data if self.method in ("POST", "PUT", "PATCH") else
None,
- json=json,
- headers=_headers,
- auth=auth,
- **extra_options,
)
- try:
- response.raise_for_status()
- except ClientResponseError as e:
- self.log.warning(
- "[Try %d of %d] Request to %s failed.",
- attempt,
- self.retry_limit,
- url,
- )
- if not self._retryable_error_async(e) or attempt ==
self.retry_limit:
- self.log.exception("HTTP error with status: %s",
e.status)
- # In this case, the user probably made a mistake.
- # Don't retry.
- raise AirflowException(f"{e.status}:{e.message}")
- else:
- await asyncio.sleep(self.retry_delay)
+ if not self._retryable_error_async(e) or attempt ==
self.retry_limit:
+ self.log.exception("HTTP error with status: %s", e.status)
+ # In this case, the user probably made a mistake.
+ # Don't retry.
+ raise AirflowException(f"{e.status}:{e.message}")
else:
Review Comment:
else might not be needed
##########
providers/src/airflow/providers/http/hooks/http.py:
##########
@@ -410,54 +411,53 @@ async def run(
url = _url_from_endpoint(self.base_url, endpoint)
- async with aiohttp.ClientSession() as session:
- if self.method == "GET":
- request_func = session.get
- elif self.method == "POST":
- request_func = session.post
- elif self.method == "PATCH":
- request_func = session.patch
- elif self.method == "HEAD":
- request_func = session.head
- elif self.method == "PUT":
- request_func = session.put
- elif self.method == "DELETE":
- request_func = session.delete
- elif self.method == "OPTIONS":
- request_func = session.options
- else:
- raise AirflowException(f"Unexpected HTTP Method:
{self.method}")
-
- for attempt in range(1, 1 + self.retry_limit):
- response = await request_func(
+ if self.method == "GET":
+ request_func = session.get
+ elif self.method == "POST":
+ request_func = session.post
+ elif self.method == "PATCH":
+ request_func = session.patch
+ elif self.method == "HEAD":
+ request_func = session.head
+ elif self.method == "PUT":
+ request_func = session.put
+ elif self.method == "DELETE":
+ request_func = session.delete
+ elif self.method == "OPTIONS":
+ request_func = session.options
+ else:
+ raise AirflowException(f"Unexpected HTTP Method: {self.method}")
Review Comment:
It would be better if we could create a customized exception inherited from
AirflowExpection. This exception can be added into
`providers/src/airflow/providers/http/exceptions.py`
##########
providers/src/airflow/providers/http/hooks/http.py:
##########
@@ -410,54 +411,53 @@ async def run(
url = _url_from_endpoint(self.base_url, endpoint)
- async with aiohttp.ClientSession() as session:
- if self.method == "GET":
- request_func = session.get
- elif self.method == "POST":
- request_func = session.post
- elif self.method == "PATCH":
- request_func = session.patch
- elif self.method == "HEAD":
- request_func = session.head
- elif self.method == "PUT":
- request_func = session.put
- elif self.method == "DELETE":
- request_func = session.delete
- elif self.method == "OPTIONS":
- request_func = session.options
- else:
- raise AirflowException(f"Unexpected HTTP Method:
{self.method}")
-
- for attempt in range(1, 1 + self.retry_limit):
- response = await request_func(
+ if self.method == "GET":
+ request_func = session.get
+ elif self.method == "POST":
+ request_func = session.post
+ elif self.method == "PATCH":
+ request_func = session.patch
+ elif self.method == "HEAD":
+ request_func = session.head
+ elif self.method == "PUT":
+ request_func = session.put
+ elif self.method == "DELETE":
+ request_func = session.delete
+ elif self.method == "OPTIONS":
+ request_func = session.options
+ else:
+ raise AirflowException(f"Unexpected HTTP Method: {self.method}")
+
+ for attempt in range(1, 1 + self.retry_limit):
+ response = await request_func(
+ url,
+ params=data if self.method == "GET" else None,
+ data=data if self.method in ("POST", "PUT", "PATCH") else None,
+ json=json,
+ headers=_headers,
+ auth=auth,
+ **extra_options,
+ )
+ try:
+ response.raise_for_status()
+ except ClientResponseError as e:
+ self.log.warning(
+ "[Try %d of %d] Request to %s failed.",
+ attempt,
+ self.retry_limit,
url,
- params=data if self.method == "GET" else None,
- data=data if self.method in ("POST", "PUT", "PATCH") else
None,
- json=json,
- headers=_headers,
- auth=auth,
- **extra_options,
)
- try:
- response.raise_for_status()
- except ClientResponseError as e:
- self.log.warning(
- "[Try %d of %d] Request to %s failed.",
- attempt,
- self.retry_limit,
- url,
- )
- if not self._retryable_error_async(e) or attempt ==
self.retry_limit:
- self.log.exception("HTTP error with status: %s",
e.status)
- # In this case, the user probably made a mistake.
- # Don't retry.
- raise AirflowException(f"{e.status}:{e.message}")
- else:
- await asyncio.sleep(self.retry_delay)
+ if not self._retryable_error_async(e) or attempt ==
self.retry_limit:
+ self.log.exception("HTTP error with status: %s", e.status)
+ # In this case, the user probably made a mistake.
+ # Don't retry.
+ raise AirflowException(f"{e.status}:{e.message}")
else:
- return response
+ await asyncio.sleep(self.retry_delay)
else:
- raise NotImplementedError # should not reach this, but makes
mypy happy
+ return response
+ else:
+ raise NotImplementedError # should not reach this, but makes mypy
happy
Review Comment:
```suggestion
raise NotImplementedError # should not reach this, but makes mypy
happy
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]