radhwene commented on PR #68361: URL: https://github.com/apache/airflow/pull/68361#issuecomment-4674860196
### E2E evidence: real Cloud SQL PostgreSQL instance, Airflow v2 Adding the reproduction and validation runs behind the “Why hook-level retry, not only a sensor” and “E2E validation” sections. **Sensor-only approach does not close the race.** I validated the standalone `CloudSQLNoOperationInProgressSensor` approach by placing a `CloudSQLNoOperationInProgressSensor` in `reschedule` mode before each import task. In that run, the sensor task `wait_a` reports **success**, but the immediately downstream `CloudSQLImportInstanceOperator` task `import_complaints` still fails with `409 operationInProgress`. That demonstrates the remaining TOCTOU window between: 1. the sensor observing the instance as idle; 2. the downstream operator being scheduled; 3. the operator submitting the actual Cloud SQL Admin API request.  **Hook-level retry handles the contention at the submit point.** I also validated this PR with a harder topology: four Cloud SQL admin operations submitted in parallel against the same instance — two imports and two exports — with no sensor. All four tasks succeed after retrying the operation submit, and no `409 operationInProgress` is surfaced to the DAG run.  This validates both patched hook methods, `import_instance` and `export_instance`, under real Cloud SQL operation serialization. <details> <summary>E2E DAG used for the validation (<code>cloudsql_retry_stress_409</code>)</summary> ```python """ cloudsql_retry_stress.py E2E stress test for the apache/airflow#68040 retry-on-409 fix. This DAG fans out four Cloud SQL admin operations in parallel against the same Cloud SQL PostgreSQL instance: create_tables ──┬──> import_complaints ├──> import_crime ├──> export_a └──> export_b Cloud SQL serializes admin operations per instance, so simultaneous submits can collide with HTTP 409 ``operationInProgress``. Expected behavior: - stock provider: parallel operations can fail with 409 and the DAG run fails; - patched provider: import/export submit calls retry through ``operation_in_progress_retry``; all four tasks complete successfully. Export object URIs are unique per run with ``{{ ts_nodash }}`` because Cloud SQL export fails if the destination GCS object already exists. Trigger config: PROJECT_ID, INSTANCE, GCS_BUCKET, DB_NAME. """ from __future__ import annotations from datetime import datetime from pathlib import Path from airflow import DAG from airflow.models.param import Param from airflow.providers.google.cloud.operators.cloud_sql import ( CloudSQLExportInstanceOperator, CloudSQLImportInstanceOperator, ) from airflow.providers.postgres.operators.postgres import PostgresOperator INIT_SQL_PATH = Path(__file__).resolve().parent / "init_tables.sql" def _import(task_id: str, table: str) -> CloudSQLImportInstanceOperator: return CloudSQLImportInstanceOperator( task_id=task_id, project_id="{{ params.PROJECT_ID }}", instance="{{ params.INSTANCE }}", body={ "importContext": { "fileType": "CSV", "uri": f"gs://{{{{ params.GCS_BUCKET }}}}/{table}.csv", "database": "{{ params.DB_NAME }}", "csvImportOptions": {"table": table}, } }, gcp_conn_id="google_cloud_default", ) def _export(task_id: str, label: str) -> CloudSQLExportInstanceOperator: return CloudSQLExportInstanceOperator( task_id=task_id, project_id="{{ params.PROJECT_ID }}", instance="{{ params.INSTANCE }}", body={ "exportContext": { "fileType": "CSV", "uri": f"gs://{{{{ params.GCS_BUCKET }}}}/stress_export_{label}_{{{{ ts_nodash }}}}.csv", "databases": ["{{ params.DB_NAME }}"], "csvExportOptions": {"selectQuery": "SELECT 1 AS col"}, } }, gcp_conn_id="google_cloud_default", ) with DAG( dag_id="cloudsql_retry_stress_409", start_date=datetime(2024, 1, 1), schedule=None, catchup=False, tags=["cloudsql", "fix-409", "retry", "stress"], params={ "PROJECT_ID": Param(default="CHANGE_ME", type="string"), "INSTANCE": Param(default="CHANGE_ME", type="string"), "GCS_BUCKET": Param(default="CHANGE_ME", type="string"), "DB_NAME": Param(default="airflow_db", type="string"), }, render_template_as_native_obj=True, ) as dag: create_tables = PostgresOperator( task_id="create_tables", postgres_conn_id="cloudsql_pg", sql=INIT_SQL_PATH.read_text(), ) create_tables >> [ _import("import_complaints", "complaints"), _import("import_crime", "crime"), _export("export_a", "a"), _export("export_b", "b"), ] ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
