shreyaskj-0710 commented on issue #53896:
URL: https://github.com/apache/airflow/issues/53896#issuecomment-3162629101
Hi @jfallt ,
**Reason why the dataset updates appear to be grouped is as follows -**
Suppose a Dag depends on multiple datasets aladdin_adx_coupon,
aladdin_adx_factor ,aladdin_adx_issuer, aladdin_adx_security,
aladdin_adx_shares_outstanding
Then the schedule property of that DAG will be as below -
schedule = aladdin_adx_coupon | aladdin_adx_factor | aladdin_adx_issuer |
aladdin_adx_security | aladdin_adx_shares_outstanding
Even if many datasets are updated at the same time, DAG will be triggered
just once, because the rule is:
"At any given time, if one or more of them update, go once!!"
It doesn’t matter how many changed at the same time— one or five — it only
causes a single run .
Optional analogy:
Think of it like a doorbell connected to 5 buttons — pressing any one or
more buttons at the same time will ring the bell just once.
That's why it appears to be grouped.
**One DAG will process different files depending on which dataset uri is
updated**
You can pass the dataset info **to the consumer dag** (dag which consumes
dataset) ,
**from the producer dag** (dag which updates dataset) **using the conf
property of TriggerDagRunOperator in the producer dag** -
**trigger_consumer = TriggerDagRunOperator(
task_id="trigger_consumer_from_producer_a",
trigger_dag_id="consumer_triggered_dag", # ID of the consumer DAG
conf={
"source_producer": "producer_dag_trigger_a",
"producer_run_id": "{{ run_id }}",
"logical_date": "{{ logical_date.isoformat() }}",
"data_source_info": "dataset_name"
},
wait_for_completion=False,
)**
the same conf value can be consumed by the consumer dag and perform required
operations based on dataset.
def _log_trigger_info(**context: Any):
"""
details about how this DAG run was triggered,
especially via the 'conf' passed by TriggerDagRunOperator.
"""
dag_run = context["dag_run"]
# Access the conf dictionary passed by the TriggerDagRunOperator
triggered_by_conf = dag_run.conf
if triggered_by_conf:
print("\n--- Triggering Info from Producer DAG ---")
print(f" Source Producer: {triggered_by_conf.get('source_producer',
'N/A')}")
print(f" Producer Run ID: {triggered_by_conf.get('producer_run_id',
'N/A')}")
print(f" Producer Logical Date:
{triggered_by_conf.get('logical_date', 'N/A')}")
print(f" Data Source Info:
{triggered_by_conf.get('data_source_info', 'N/A')}") # dataset_name
#perform required operation based on dataset
else:
print("\n--- No 'conf' data passed to this triggered run ---")
print("(This likely means it was manually triggered without conf, or
a time-based run)")
print("Consumer: Starting data processing based on received trigger
info...")
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]