deepak4babu commented on issue #51887:
URL: https://github.com/apache/airflow/issues/51887#issuecomment-2987222906

   @jroachgolf84 
   Here is some sample of what we are doing
   
   sample_dag.py
   ```python
   # sample_dag.py
   import os
   os.environ["TEAM_ID"] = "<some id>"
   
   from airflow.decorators import task, dag
   from utils import BlobFileExistsOperator, upload_file_for_processing, 
download_processed_file, check_file_exists
   
   @dag(
       ...
   )
   def sample():
       
       @task()
       def task1():
           upload_file_for_processing(file_name="sample")
       
       task2 = BlobFileExistsOperator(
           deferrable=True
       )
       
       @task()
       def task3():
           download_processed_file(file_name="sample")
       
       task1() > task2 > task3
       
   sample()
   ```
   Utils.py
   ```
   from airflow.sensors.base import BaseSensorOperator
   import asyncio
   # This below blob.adapaters module reads the os.environ to get the team_id 
and 
   # connects to respective blobs for file processing
   from blob.adapters import upload_file_to_blob, download_file_from_blob, 
check_file_exists_in_blob, validate_file_using_validator_service
   
   def upload_file_for_processing(f):
       upload_file_to_blob(f)
   
   def download_processed_file(f):
       file = download_file_from_blob(f)
       return file
   
   def check_file_exists(f):
       return check_file_exists_in_blob(f)
   
   def validate_file(f):
       return validate_file_using_validator_service(f)
   
   class BlobFileExistsOperator(BaseSensorOperator):
       def __init__(self):
           ...
           ...
       
       def execute(self, context) -> None:
           """Airflow runs this method on the worker and defers using the 
trigger."""
           import os
           if self.deferrable:
               self.defer(
                   trigger=BlobFileExistsTrigger(
                       file_name="processed_sample_file",
                       team_id=os.environ["TEAM_ID"],
                   ),
                   method_name="execute_complete",
               )
           else:
               super().execute(context=context)    
               
   class BlobFileExistsTrigger:
       def __init__(self, file_name, team_id):
           super().__init__()
           self.file_name = file_name
           self.team_id = team_id
           
       def serialize(self):
           return (
               "utils.BlobFileExistsTrigger",
               {
                   "file_name": self.file_name,
                   "team_id": self.team_id
               },
           )
           
       async def run(self):
           import os
           os.environ["TEAM_ID"] = self.team_id # causes issues
           
           from utils import check_file_exists, validate_file
           
           while True:
               try:
                   await check_file_exists(self.file_name)
               except Exception:
                   await asyncio.sleep(10)
               else:
                   break
           
           while True:
               try:
                   await validate_file(self.file_name)
               except Exception:
                   await asyncio.sleep(10)
               else:
                   break 
   
           yield TriggerEvent(True)
   ```
   
   The blob.adapter package imported in utils, uses the environment variable to 
connect to respective blobs. Now in triggerer, setting environment variable 
will not work, as it will bleed into other teams async coroutine and cause 
issues.
   
   We tried using contextvars instead of environment variable, but contextvar 
variable returns None inside blob.adapters


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to