deepak4babu commented on issue #51887:
URL: https://github.com/apache/airflow/issues/51887#issuecomment-2987222906
@jroachgolf84
Here is some sample of what we are doing
sample_dag.py
```python
# sample_dag.py
import os
os.environ["TEAM_ID"] = "<some id>"
from airflow.decorators import task, dag
from utils import BlobFileExistsOperator, upload_file_for_processing,
download_processed_file, check_file_exists
@dag(
...
)
def sample():
@task()
def task1():
upload_file_for_processing(file_name="sample")
task2 = BlobFileExistsOperator(
deferrable=True
)
@task()
def task3():
download_processed_file(file_name="sample")
task1() > task2 > task3
sample()
```
Utils.py
```
from airflow.sensors.base import BaseSensorOperator
import asyncio
# This below blob.adapaters module reads the os.environ to get the team_id
and
# connects to respective blobs for file processing
from blob.adapters import upload_file_to_blob, download_file_from_blob,
check_file_exists_in_blob, validate_file_using_validator_service
def upload_file_for_processing(f):
upload_file_to_blob(f)
def download_processed_file(f):
file = download_file_from_blob(f)
return file
def check_file_exists(f):
return check_file_exists_in_blob(f)
def validate_file(f):
return validate_file_using_validator_service(f)
class BlobFileExistsOperator(BaseSensorOperator):
def __init__(self):
...
...
def execute(self, context) -> None:
"""Airflow runs this method on the worker and defers using the
trigger."""
import os
if self.deferrable:
self.defer(
trigger=BlobFileExistsTrigger(
file_name="processed_sample_file",
team_id=os.environ["TEAM_ID"],
),
method_name="execute_complete",
)
else:
super().execute(context=context)
class BlobFileExistsTrigger:
def __init__(self, file_name, team_id):
super().__init__()
self.file_name = file_name
self.team_id = team_id
def serialize(self):
return (
"utils.BlobFileExistsTrigger",
{
"file_name": self.file_name,
"team_id": self.team_id
},
)
async def run(self):
import os
os.environ["TEAM_ID"] = self.team_id # causes issues
from utils import check_file_exists, validate_file
while True:
try:
await check_file_exists(self.file_name)
except Exception:
await asyncio.sleep(10)
else:
break
while True:
try:
await validate_file(self.file_name)
except Exception:
await asyncio.sleep(10)
else:
break
yield TriggerEvent(True)
```
The blob.adapter package imported in utils, uses the environment variable to
connect to respective blobs. Now in triggerer, setting environment variable
will not work, as it will bleed into other teams async coroutine and cause
issues.
We tried using contextvars instead of environment variable, but contextvar
variable returns None inside blob.adapters
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]