astro-anand commented on code in PR #51463: URL: https://github.com/apache/airflow/pull/51463#discussion_r2143009820
########## providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py: ########## @@ -300,10 +307,89 @@ async def get_sql_api_query_status_async(self, query_id: str) -> dict[str, str | """ self.log.info("Retrieving status for query id %s", query_id) header, params, url = self.get_request_url_header_params(query_id) - async with ( - aiohttp.ClientSession(headers=header) as session, - session.get(url, params=params) as response, + status_code, resp = await self._make_api_call_with_retries_async("GET", url, header, params) + return self._process_response(status_code, resp) + + @staticmethod + def _should_retry_on_error(exception) -> bool: + """ + Determine if the exception should trigger a retry based on error type and status code. + + Retries on HTTP errors 429 (Too Many Requests), 503 (Service Unavailable), + and 504 (Gateway Timeout) as recommended by Snowflake error handling docs. + Retries on connection errors and timeouts. + + :param exception: The exception to check + :return: True if the request should be retried, False otherwise + """ + if isinstance(exception, HTTPError): + return exception.response.status_code in [429, 503, 504] + if isinstance(exception, ClientResponseError): + return exception.status in [429, 503, 504] + if isinstance( + exception, + ( + ConnectionError, + Timeout, + ClientConnectionError, + ), ): - status_code = response.status - resp = await response.json() - return self._process_response(status_code, resp) + return True + return False + + def _make_api_call_with_retries(self, method, url, headers, params=None, data=None): + """ + Make an API call to the Snowflake SQL API with retry logic for specific HTTP errors. + + Error handling implemented based on Snowflake error handling docs: + https://docs.snowflake.com/en/developer-guide/sql-api/handling-errors + + :param method: The HTTP method to use for the API call. + :param url: The URL for the API endpoint. + :param headers: The headers to include in the API call. + :param params: (Optional) The query parameters to include in the API call. + :param data: (Optional) The data to include in the API call. + :return: The response object from the API call. + """ + + @tenacity.retry(**self.retry_config) # Use the retry args defined in constructor + def _make_request(): + if method.upper() == "GET": + response = requests.get(url, headers=headers, params=params) + elif method.upper() == "POST": + response = requests.post(url, headers=headers, params=params, json=data) + else: + raise ValueError(f"Unsupported HTTP method: {method}") + response.raise_for_status() + return response.status_code, response.json() + + return _make_request() Review Comment: @mik-laj any thoughts on the desired approach? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org