vchiapaikeo opened a new issue, #26278:
URL: https://github.com/apache/airflow/issues/26278

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   8.3.0
   
   ### Apache Airflow version
   
   2.3.4
   
   ### Operating System
   
   Debian 11
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   During routine CloudSQL maintenance, a GCSObjectExistenceSensor failed with 
the below `sqlalchemy.exc.OperationalError` exception. While this is somewhat 
expected and can be mitigated with top level Airflow retries and retry_delay 
policies, we would like the ability to define a more rigorous retry policy on 
the sensor itself. This ability currently exists for certain GCP operators - 
for example, the 
[DataprocCreateClusterOperator](https://github.com/apache/airflow/blob/eb03959e437e11891b8c3696b76f664a991a37a4/airflow/providers/google/cloud/operators/dataproc.py#L468).
 However, it is not comprehensive. The GCSObjectExistenceSensor is a place 
where its absence is felt because of how long these sensors run for and the 
need for them to be robust.
   
   ```
   [2022-08-31, 16:12:28 UTC] {base_job.py:229} ERROR - LocalTaskJob heartbeat 
got an exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 3141, in _wrap_pool_connect
       return fn()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 301, in connect
       return _ConnectionFairy._checkout(self)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 755, in _checkout
       fairy = _ConnectionRecord.checkout(pool)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 419, in checkout
       rec = pool._do_get()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/impl.py", 
line 259, in _do_get
       return self._create_connection()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 247, in _create_connection
       return _ConnectionRecord(self)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 362, in __init__
       self.__connect(first_connect_check=True)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 605, in __connect
       pool.logger.debug("Error on connect(): %s", e)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
       compat.raise_(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 599, in __connect
       connection = pool._invoke_creator(self)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/create.py", 
line 578, in connect
       return dialect.connect(*cargs, **cparams)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
 line 583, in connect
       return self.dbapi.connect(*cargs, **cparams)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/MySQLdb/__init__.py", line 
123, in Connect
       return Connection(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/MySQLdb/connections.py", line 
185, in __init__
       super().__init__(*args, **kwargs2)
   MySQLdb.OperationalError: (2003, "Can't connect to MySQL server on 
'XYZ:3306' (111)")
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", 
line 201, in heartbeat
       session.merge(self)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 2872, in merge
       return self._merge(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 2950, in _merge
       merged = self.get(mapper.class_, key[1], identity_token=key[2])
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 2695, in get
       return self._get_impl(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 2796, in _get_impl
       return db_load_fn(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", 
line 531, in load_on_pk_identity
       session.execute(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 1669, in execute
       conn = self._connection_for_bind(bind, close_with_result=True)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 1519, in _connection_for_bind
       return self._transaction._connection_for_bind(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 747, in _connection_for_bind
       conn = bind.connect()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 3095, in connect
       return self._connection_cls(self, close_with_result=close_with_result)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 91, in __init__
       else engine.raw_connection()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 3174, in raw_connection
       return self._wrap_pool_connect(self.pool.connect, _connection)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 3144, in _wrap_pool_connect
       Connection._handle_dbapi_exception_noconnection(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 2003, in _handle_dbapi_exception_noconnection
       util.raise_(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 3141, in _wrap_pool_connect
       return fn()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 301, in connect
       return _ConnectionFairy._checkout(self)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 755, in _checkout
       fairy = _ConnectionRecord.checkout(pool)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 419, in checkout
       rec = pool._do_get()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/impl.py", 
line 259, in _do_get
       return self._create_connection()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 247, in _create_connection
       return _ConnectionRecord(self)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 362, in __init__
       self.__connect(first_connect_check=True)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 605, in __connect
       pool.logger.debug("Error on connect(): %s", e)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
       compat.raise_(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", 
line 599, in __connect
       connection = pool._invoke_creator(self)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/create.py", 
line 578, in connect
       return dialect.connect(*cargs, **cparams)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
 line 583, in connect
       return self.dbapi.connect(*cargs, **cparams)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/MySQLdb/__init__.py", line 
123, in Connect
       return Connection(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/MySQLdb/connections.py", line 
185, in __init__
       super().__init__(*args, **kwargs2)
   sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2003, "Can't 
connect to MySQL server on 'XYZ:3306' (111)")
   (Background on this error at: http://sqlalche.me/e/14/e3q8)
   [2022-08-31, 16:12:45 UTC] {gcs.py:81} INFO - Sensor checks existence of : 
my-bucket-name, vchiapaikeo/trigger.file
   ```
   
   ### What you think should happen instead
   
   Currently, the sensor makes a call to 
[hook.exists()](https://github.com/apache/airflow/blob/eb03959e437e11891b8c3696b76f664a991a37a4/airflow/providers/google/cloud/sensors/gcs.py#L87)
 to perform its check. This function uses the [underlying blob resource's 
exists](https://github.com/apache/airflow/blob/eb03959e437e11891b8c3696b76f664a991a37a4/airflow/providers/google/cloud/hooks/gcs.py#L544-L555)
 method to check on whether the object exists. The google-cloud-storage 
library's Blob.exists method actually [accepts retry 
object](https://github.com/googleapis/python-storage/blob/282e3b605e62c4f8d93debff20d40afbf2e718f5/google/cloud/storage/blob.py#L636)
 - and if unpassed, it will use the [DEFAULT_RETRY 
constant](https://github.com/googleapis/python-storage/blob/282e3b605e62c4f8d93debff20d40afbf2e718f5/google/cloud/storage/retry.py#L52).
 
   
   We should allow for the retry object to be overridden by exposing a retry 
parameter to the GCSObjectExistenceSensor operator which will then allow us to 
pass that argument through in the sensor's poke implementation (via the 
hook.exists() call). Something like:
   
   ```py
       def poke(self, context: "Context") -> bool:
           self.log.info('Sensor checks existence of : %s, %s', self.bucket, 
self.object)
           hook = GCSHook(
               gcp_conn_id=self.google_cloud_conn_id,
               delegate_to=self.delegate_to,
               impersonation_chain=self.impersonation_chain,
           )
           return hook.exists(self.bucket, self.object, retry=self.retry)
   ```
   
   To maintain backwards compatibility, we should use the same default retry 
that is defined in google-cloud-storage:
   
   
https://github.com/googleapis/python-storage/blob/282e3b605e62c4f8d93debff20d40afbf2e718f5/google/cloud/storage/retry.py#L52
   
   ### How to reproduce
   
   Simple dag to repro. Once dag is running, initiate a DB failover.
   
   ```py
   import datetime
   
   from airflow import DAG
   from airflow.providers.google.cloud.sensors.gcs import 
GCSObjectExistenceSensor
   
   DEFAULT_TASK_ARGS = {
       "owner": "xyz",
       "start_date": "2022-09-01",
       "retries": 3,
       "retry_delay": 300,
       "project_id": "my-project-id",
   }
   
   sensor_count = 50
   
   with DAG(
       schedule_interval="@daily",
       max_active_runs=1,
       max_active_tasks=sensor_count,
       catchup=False,
       dag_id="test_gcs_sensor",
       default_args=DEFAULT_TASK_ARGS,
   ) as dag:
   
       for i in range(sensor_count):
           _ = GCSObjectExistenceSensor(
               bucket="my-bucket-name",
               object="vchiapaikeo/trigger.file",
               retry_delay=(datetime.timedelta(seconds=60)),
               start_date=(datetime.datetime(2021, 4, 16, 0, 0)),
               task_id=f"test_sensor_{i}",
               poke_interval=300,
           )
   ```
   
   ### Anything else
   
   Only when certain conditions are met - need a DB failover or some other event
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to