victorrgez opened a new issue, #36510:
URL: https://github.com/apache/airflow/issues/36510

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.6.3
   
   ### What happened?
   
   We need to create custom triggers [1] and use them in our Production 
environment with Cloud Composer (`composer-2.5.3-airflow-2.6.3`). We normally 
work with packaged dags in zip format, which have all the required dependencies 
for each DAG.
   
   The issue we have is that, once our custom operators are deferred, the tasks 
fail since the triggerer cannot create the Custom Trigger class. After 
investigating, it seems our issue is related to the fact that the triggerer 
does not have the code from our zip files (furthermore, creating the libraries 
in the DAG folder outside the zip files did not help us either --> `Because the 
DAGs folder is not synchronized with the Airflow triggerer, the inlined trigger 
code is missing when the trigger is executed. ` [2])
   
   In our case, instead of the message in [2] (`ImportError: Module 
"PACKAGE_NAME" does not define a "CLASS_NAME" attribute/
   class`), what we get is `ModuleNotFoundError: No module named 'common'`.
   
   The only solution we have found so far is to create a Custom PyPI package 
and install it in the whole Cloud Composer Environment as specified in [2], 
however; we cannot afford to restart the whole Cloud Composer environment to 
update the PyPI dependencies every time we update the code, fix a bug, etc.
   
   Is there any way of being able to install/update the custom trigger code 
without needing an environment restart in production? Would it be possible to 
syncronise the DAGs folder also with the triggerer? We have not found much 
documentation about this so any help with be greatly appreciated!
   
   [1]:
   
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#writing-triggers
   [2]:
   
https://cloud.google.com/composer/docs/composer-2/troubleshooting-triggerer#trigger_class_not_found
   
   ### What you think should happen instead?
   
   There should be an easy and quick way of updating the code of Custom 
Triggers without needing to restart the production environment to update a PyPI 
package every time a line of the code changes.
   
   ### How to reproduce
   
   1. Create a custom trigger and an operator that uses that trigger in 
deferrable mode (Attaching an example below)
   2. Put the code for that trigger in either a packaged DAG (zip file) or 
directly in the Dags folder
   3. When the task is deferred, the triggerer fails with Module not found as 
the class code is not synchronized with the Triggerer
   
   ### Operating System
   
   composer-2.5.3-airflow-2.6.3
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow==2.6.3
   apache-airflow-providers-google==10.10.1
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   Attaching sample code from one of our triggers (censoring provider name as 
it is confidential):
   
   ```
   from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
   from airflow.triggers.base import BaseTrigger, TriggerEvent
   
   
   class BigQueryPartitionExistsTrigger(BaseTrigger):
   
       def __init__(self, *, project_id: str, dataset_id: str, table_id: str, 
partition_date: str,
                    gcp_conn_id="google_cloud_default", use_legacy_sql: bool = 
False,
                    location: str = "EU", defer_seconds: int = 300
                    ) -> None:
           super().__init__()
           self.project_id = project_id
           self.dataset_id = dataset_id
           self.table_id = table_id
           self.partition_date = partition_date
           self.gcp_conn_id = gcp_conn_id
           self.use_legacy_sql = use_legacy_sql
           self.location = location
           self.defer_seconds = defer_seconds
   
       def serialize(self) -> str:
           """
           Method that specifies the instructions for reconstructing the class 
inside Airflow Triggerer process.
           It needs to know the classpath and the list of kwargs to be provided 
to the `__init__` method.
   
           This trigger is not working at the moment as the DAGs folder is not 
synchronised with the Triggerer. You need to
           create a new package and install it inside composer with PyPI
           """
           return 
("common.airflow.providers.<CENSORED>.custom_triggers.BigQueryTriggers.BigQueryPartitionExistsTrigger",
                   {"project_id": self.project_id,
                    "dataset_id": self.dataset_id,
                    "table_id": self.table_id,
                    "partition_date": self.partition_date,
                    "gcp_conn_id": self.gcp_conn_id,
                    "use_legacy_sql": self.use_legacy_sql,
                    "location": self.location,
                    "defer_seconds": self.defer_seconds})
   
       async def run(self):
           from asyncio import sleep as async_sleep
           print(f"About to create hook: {self.gcp_conn_id=}")
           bq_hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id,
                                  use_legacy_sql=self.use_legacy_sql,
                                  location=self.location)
   
           while not bq_hook.table_partition_exists(
                       project_id=self.project_id,
                       dataset_id=self.dataset_id,
                       table_id=self.table_id,
                       partition_id=self.partition_date
                   ):
               await async_sleep(self.defer_seconds)
           yield TriggerEvent({"status": "success"})
   ```
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to