ajayganti3 opened a new issue, #51213: URL: https://github.com/apache/airflow/issues/51213
### Apache Airflow Provider(s) amazon ### Versions of Apache Airflow Providers apache-airflow-providers-amazon 9.8.0 ### Apache Airflow version 3.0.1 ### Operating System Ubuntu 22.04.3 LTS ### Deployment Docker-Compose ### Deployment details I'm using the official airflow docker compose file to spin up the airflow 3.0.1 ### What happened I have a DAG configured with an Amazon SQS trigger using Airflow 3.0.1 and the airflow/assets approach to auto-trigger DAGs upon message arrival in the queue. The DAG works as expected for a short period (approximately 30 minutes), triggering whenever a message is sent to the SQS queue. However, after this initial period: - The DAG stops triggering despite new messages being available in the SQS queue. - Restarting Airflow using docker compose restart temporarily restores the triggering functionality. - After another ~30 minutes, the problem recurs. - In some cases, even after restarting, the DAG never triggers again. - The aws_default connection used for the SQS integration remains valid and operational (confirmed via manual testing). ### What you think should happen instead The SQS-triggered DAG should: - Continuously monitor the queue and trigger the DAG when a new message arrives. - Remain functional indefinitely without needing to restart Airflow. - Provide meaningful logs or error messages if/when it stops functioning. ### How to reproduce 1. Use Airflow 3.0.1 official airflow docker compose 2. Deploy with Docker Compose and start the Airflow environment. 3. Define a DAG with an SQS trigger using Airflow assets. 4. Set up a valid aws_default connection with appropriate IAM permissions. 5. Send messages to the SQS queue — observe that the DAG is triggered. 6. Wait — after a while, send another message. (Or use a separate python script that sends messages to the SQS at regular intervals) 7. Observe that the DAG is no longer triggered. 8. Restart Airflow with docker compose restart, and observe that triggering works again briefly, then fails. (Sometimes even after the restart also it never gets triggered). 9. Below is the airflow DAG that I'm using. ```from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger from airflow.sdk import Asset, AssetWatcher, dag, task import os # Define the SQS queue URL # Replace my_account_id and my_queue_name SQS_QUEUE = "https://sqs.us-east-1.amazonaws.com/<my_account_id>/<my_queue_name>" # Define a trigger that listens to an external message queue (AWS SQS in this case) trigger = MessageQueueTrigger( aws_conn_id="aws_default", queue=SQS_QUEUE, waiter_delay=10, # delay in seconds between polls ) # Define an asset that watches for messages on the queue sqs_queue_asset = Asset( "sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", trigger=trigger)] ) # Schedule the DAG to run when the asset is triggered @dag(schedule=[sqs_queue_asset]) def event_driven_dag(): @task def process_message(**context): # Extract the triggering asset events from the context triggering_asset_events = context["triggering_asset_events"] for event in triggering_asset_events[sqs_queue_asset]: # Get the message from the TriggerEvent payload print( f"Processing message: {event.extra["payload"]["message_batch"][0]["Body"]}" ) process_message() event_driven_dag() ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
