shahar1 commented on PR #67052:
URL: https://github.com/apache/airflow/pull/67052#issuecomment-4520683347

   > Hi @shahar1 -- addressed the system-test concern.
   > 
   > Why PR #61046 had to be reverted (revert commit 
[14995f1](https://github.com/apache/airflow/commit/14995f1967f299bf4ecb16133fb1c18f49ec1500)):
 the example called CloudComposerDAGRunSensor without execution_range, so the 
sensor inferred the window [logical_date - 1d, logical_date]. The Composer 
environment was created mid-DAG-run, which meant no airflow_monitoring runs 
ever fell inside that past window. The buggy code returned True (false 
positive); the empty-window fix correctly returns False and the example then 
hung forever.
   > 
   > The example patch in the earlier revision of this PR tried to side-step 
the issue with execution_range=[datetime.now() - timedelta(1), datetime.now()], 
but that's evaluated at DAG-parse time -- roughly half an hour before the 
sensor pokes and before the Composer env exists. Same hang mode, just hidden.
   > 
   > What this commit changes:
   > 
   > * composer_dag_run_id added to CloudComposerDAGRunSensor.template_fields.
   > * Example reordered so each CloudComposerTriggerDAGRunOperator runs before 
its sibling Dag-run sensor; sensors pull the freshly minted dag_run_id via 
XCom. The run-id branch is pure id+state equality and bypasses execution_range 
entirely, so the hang mode cannot recur on the new path.
   > * Defensive timeout=60*60 on the Dag-run sensors so any future regression 
to a windowed code path fails CI fast rather than hanging the worker.
   > * External-task sensors deliberately left on the legacy windowed path -- 
the analogous _check_task_instances_states anti-pattern is tracked at 
[CloudComposerExternalTaskSensor succeeds when all task instances are outside 
execution_rangeĀ #67051](https://github.com/apache/airflow/issues/67051), and 
the example carries an inline note so the [CloudComposerExternalTaskSensor 
succeeds when all task instances are outside execution_rangeĀ 
#67051](https://github.com/apache/airflow/issues/67051) fix must reorder them 
at the same time.
   > 
   > Verification I can run locally without GCP credentials:
   > 
   > * six new sensor unit tests + two new trigger unit tests covering the 
composer_dag_run_id branch (template_fields, Jinja render, poke success/wait, 
range-vs-id precedence, parse-time source guard, trigger branch precedence, 
trigger polling) -- all 75 tests in 
providers/google/tests/unit/google/cloud/{sensors,triggers}/test_cloud_composer.py
 pass.
   > * dev/simulate_composer_system_test.py, a standalone harness that loads 
the example via importlib, mocks every GCP surface the example touches 
(googleapiclient + CloudComposerHook + CloudComposerAsyncHook + Environment / 
ImageVersion / ExecuteAirflowCommandResponse.to_dict), runs dag.test() 
end-to-end, and reports per-task state. The deferred trigger runs inline under 
dag.test(), so the deferrable sensor path is exercised too.
   > 
   > Simulation result:
   > 
   > ```
   > $ uv run --project providers/google python 
dev/simulate_composer_system_test.py
   > loading example dag from .../example_cloud_composer.py
   > registering dag bundle and serializing dag
   > running dag.test() on composer
   > OK: dag.test() completed for composer -- final state: success
   >   create_env                       -> success
   >   dag_run_sensor                   -> success
   >   defer_create_env                 -> success
   >   defer_dag_run_sensor             -> success
   >   defer_delete_env                 -> success
   >   defer_external_task_sensor       -> success
   >   defer_run_airflow_cli_cmd        -> success
   >   defer_trigger_dag_run            -> success
   >   defer_update_env                 -> success
   >   delete_env                       -> success
   >   external_task_sensor             -> success
   >   get_env                          -> success
   >   get_project_number               -> success
   >   image_versions                   -> success
   >   list_envs                        -> success
   >   run_airflow_cli_cmd              -> success
   >   trigger_dag_run                  -> success
   >   update_env                       -> success
   >   watcher                          -> skipped
   > ```
   > 
   > Both the sync dag_run_sensor and the deferred defer_dag_run_sensor matched 
on the trigger ops' run id via the composer_dag_run_id branch.
   > 
   > @VladaZakharova @MaksYermak -- would appreciate a confirmation pass 
against real Composer infra when convenient, since the simulation can only 
validate the control flow, not the actual REST API contracts.
   > 
   > Drafted-by: Claude Code (Opus 4.7 1M); reviewed by @Vamsi-klu before 
posting
   
   When asking to run the system tests, I always refer to a real GCP instance - 
simulating it against a mock isn't helpful (maybe with exception for very 
specific services in GCP that have emulators). 
   Therefore, please remove `dev/simulate_composer_system_test.py`.
   As you currently lack the means to run the system tests, I'll see if I'm 
able to help with running them (can't promise that it will happen soon).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to