shahar1 commented on PR #67052: URL: https://github.com/apache/airflow/pull/67052#issuecomment-4520683347
> Hi @shahar1 -- addressed the system-test concern. > > Why PR #61046 had to be reverted (revert commit [14995f1](https://github.com/apache/airflow/commit/14995f1967f299bf4ecb16133fb1c18f49ec1500)): the example called CloudComposerDAGRunSensor without execution_range, so the sensor inferred the window [logical_date - 1d, logical_date]. The Composer environment was created mid-DAG-run, which meant no airflow_monitoring runs ever fell inside that past window. The buggy code returned True (false positive); the empty-window fix correctly returns False and the example then hung forever. > > The example patch in the earlier revision of this PR tried to side-step the issue with execution_range=[datetime.now() - timedelta(1), datetime.now()], but that's evaluated at DAG-parse time -- roughly half an hour before the sensor pokes and before the Composer env exists. Same hang mode, just hidden. > > What this commit changes: > > * composer_dag_run_id added to CloudComposerDAGRunSensor.template_fields. > * Example reordered so each CloudComposerTriggerDAGRunOperator runs before its sibling Dag-run sensor; sensors pull the freshly minted dag_run_id via XCom. The run-id branch is pure id+state equality and bypasses execution_range entirely, so the hang mode cannot recur on the new path. > * Defensive timeout=60*60 on the Dag-run sensors so any future regression to a windowed code path fails CI fast rather than hanging the worker. > * External-task sensors deliberately left on the legacy windowed path -- the analogous _check_task_instances_states anti-pattern is tracked at [CloudComposerExternalTaskSensor succeeds when all task instances are outside execution_rangeĀ #67051](https://github.com/apache/airflow/issues/67051), and the example carries an inline note so the [CloudComposerExternalTaskSensor succeeds when all task instances are outside execution_rangeĀ #67051](https://github.com/apache/airflow/issues/67051) fix must reorder them at the same time. > > Verification I can run locally without GCP credentials: > > * six new sensor unit tests + two new trigger unit tests covering the composer_dag_run_id branch (template_fields, Jinja render, poke success/wait, range-vs-id precedence, parse-time source guard, trigger branch precedence, trigger polling) -- all 75 tests in providers/google/tests/unit/google/cloud/{sensors,triggers}/test_cloud_composer.py pass. > * dev/simulate_composer_system_test.py, a standalone harness that loads the example via importlib, mocks every GCP surface the example touches (googleapiclient + CloudComposerHook + CloudComposerAsyncHook + Environment / ImageVersion / ExecuteAirflowCommandResponse.to_dict), runs dag.test() end-to-end, and reports per-task state. The deferred trigger runs inline under dag.test(), so the deferrable sensor path is exercised too. > > Simulation result: > > ``` > $ uv run --project providers/google python dev/simulate_composer_system_test.py > loading example dag from .../example_cloud_composer.py > registering dag bundle and serializing dag > running dag.test() on composer > OK: dag.test() completed for composer -- final state: success > create_env -> success > dag_run_sensor -> success > defer_create_env -> success > defer_dag_run_sensor -> success > defer_delete_env -> success > defer_external_task_sensor -> success > defer_run_airflow_cli_cmd -> success > defer_trigger_dag_run -> success > defer_update_env -> success > delete_env -> success > external_task_sensor -> success > get_env -> success > get_project_number -> success > image_versions -> success > list_envs -> success > run_airflow_cli_cmd -> success > trigger_dag_run -> success > update_env -> success > watcher -> skipped > ``` > > Both the sync dag_run_sensor and the deferred defer_dag_run_sensor matched on the trigger ops' run id via the composer_dag_run_id branch. > > @VladaZakharova @MaksYermak -- would appreciate a confirmation pass against real Composer infra when convenient, since the simulation can only validate the control flow, not the actual REST API contracts. > > Drafted-by: Claude Code (Opus 4.7 1M); reviewed by @Vamsi-klu before posting When asking to run the system tests, I always refer to a real GCP instance - simulating it against a mock isn't helpful (maybe with exception for very specific services in GCP that have emulators). Therefore, please remove `dev/simulate_composer_system_test.py`. As you currently lack the means to run the system tests, I'll see if I'm able to help with running them (can't promise that it will happen soon). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
