victorrgez opened a new issue, #36510: URL: https://github.com/apache/airflow/issues/36510
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.6.3 ### What happened? We need to create custom triggers [1] and use them in our Production environment with Cloud Composer (`composer-2.5.3-airflow-2.6.3`). We normally work with packaged dags in zip format, which have all the required dependencies for each DAG. The issue we have is that, once our custom operators are deferred, the tasks fail since the triggerer cannot create the Custom Trigger class. After investigating, it seems our issue is related to the fact that the triggerer does not have the code from our zip files (furthermore, creating the libraries in the DAG folder outside the zip files did not help us either --> `Because the DAGs folder is not synchronized with the Airflow triggerer, the inlined trigger code is missing when the trigger is executed. ` [2]) In our case, instead of the message in [2] (`ImportError: Module "PACKAGE_NAME" does not define a "CLASS_NAME" attribute/ class`), what we get is `ModuleNotFoundError: No module named 'common'`. The only solution we have found so far is to create a Custom PyPI package and install it in the whole Cloud Composer Environment as specified in [2], however; we cannot afford to restart the whole Cloud Composer environment to update the PyPI dependencies every time we update the code, fix a bug, etc. Is there any way of being able to install/update the custom trigger code without needing an environment restart in production? Would it be possible to syncronise the DAGs folder also with the triggerer? We have not found much documentation about this so any help with be greatly appreciated! [1]: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#writing-triggers [2]: https://cloud.google.com/composer/docs/composer-2/troubleshooting-triggerer#trigger_class_not_found ### What you think should happen instead? There should be an easy and quick way of updating the code of Custom Triggers without needing to restart the production environment to update a PyPI package every time a line of the code changes. ### How to reproduce 1. Create a custom trigger and an operator that uses that trigger in deferrable mode (Attaching an example below) 2. Put the code for that trigger in either a packaged DAG (zip file) or directly in the Dags folder 3. When the task is deferred, the triggerer fails with Module not found as the class code is not synchronized with the Triggerer ### Operating System composer-2.5.3-airflow-2.6.3 ### Versions of Apache Airflow Providers apache-airflow==2.6.3 apache-airflow-providers-google==10.10.1 ### Deployment Google Cloud Composer ### Deployment details Attaching sample code from one of our triggers (censoring provider name as it is confidential): ``` from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from airflow.triggers.base import BaseTrigger, TriggerEvent class BigQueryPartitionExistsTrigger(BaseTrigger): def __init__(self, *, project_id: str, dataset_id: str, table_id: str, partition_date: str, gcp_conn_id="google_cloud_default", use_legacy_sql: bool = False, location: str = "EU", defer_seconds: int = 300 ) -> None: super().__init__() self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id self.partition_date = partition_date self.gcp_conn_id = gcp_conn_id self.use_legacy_sql = use_legacy_sql self.location = location self.defer_seconds = defer_seconds def serialize(self) -> str: """ Method that specifies the instructions for reconstructing the class inside Airflow Triggerer process. It needs to know the classpath and the list of kwargs to be provided to the `__init__` method. This trigger is not working at the moment as the DAGs folder is not synchronised with the Triggerer. You need to create a new package and install it inside composer with PyPI """ return ("common.airflow.providers.<CENSORED>.custom_triggers.BigQueryTriggers.BigQueryPartitionExistsTrigger", {"project_id": self.project_id, "dataset_id": self.dataset_id, "table_id": self.table_id, "partition_date": self.partition_date, "gcp_conn_id": self.gcp_conn_id, "use_legacy_sql": self.use_legacy_sql, "location": self.location, "defer_seconds": self.defer_seconds}) async def run(self): from asyncio import sleep as async_sleep print(f"About to create hook: {self.gcp_conn_id=}") bq_hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, use_legacy_sql=self.use_legacy_sql, location=self.location) while not bq_hook.table_partition_exists( project_id=self.project_id, dataset_id=self.dataset_id, table_id=self.table_id, partition_id=self.partition_date ): await async_sleep(self.defer_seconds) yield TriggerEvent({"status": "success"}) ``` ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org