shreyaskj-0710 commented on issue #53896:
URL: https://github.com/apache/airflow/issues/53896#issuecomment-3162629101

   Hi @jfallt ,
   
   **Reason why the dataset updates appear to be grouped is as follows -** 
   
   Suppose a Dag depends on multiple datasets aladdin_adx_coupon, 
aladdin_adx_factor ,aladdin_adx_issuer, aladdin_adx_security, 
aladdin_adx_shares_outstanding
   
   Then the schedule property of that DAG will be as below -
   
   schedule = aladdin_adx_coupon | aladdin_adx_factor | aladdin_adx_issuer | 
aladdin_adx_security | aladdin_adx_shares_outstanding
   
   Even if many datasets are updated at the same time,  DAG  will be triggered 
just once, because the rule is:
   "At any given time, if one or more of them update, go once!!"
   
   It doesn’t matter how many changed at the same time— one or five — it only 
causes a single run .
   
   Optional analogy:
   Think of it like a doorbell connected to 5 buttons — pressing any one or 
more buttons at the same time will ring the bell just once.
   
   That's why it appears to be grouped.
   
   **One DAG will process different files depending on which dataset uri is 
updated**
   
   You can pass the dataset info **to the consumer dag** (dag which consumes 
dataset) , 
   **from the producer dag** (dag which updates dataset)  **using the conf 
property of TriggerDagRunOperator in the producer dag** - 
   
   **trigger_consumer = TriggerDagRunOperator(
   task_id="trigger_consumer_from_producer_a",
   trigger_dag_id="consumer_triggered_dag", # ID of the consumer DAG
   conf={
   "source_producer": "producer_dag_trigger_a",
   "producer_run_id": "{{ run_id }}",
   "logical_date": "{{ logical_date.isoformat() }}",
   "data_source_info": "dataset_name"
   },
   wait_for_completion=False,
   )**
   
   the same conf value can be consumed by the consumer dag and perform required 
operations based on dataset.
   
   def _log_trigger_info(**context: Any):
       """
        details about how this DAG run was triggered,
       especially via the 'conf' passed by TriggerDagRunOperator.
       """
       dag_run = context["dag_run"]
   
       # Access the conf dictionary passed by the TriggerDagRunOperator
       triggered_by_conf = dag_run.conf
   
       if triggered_by_conf:
           print("\n--- Triggering Info from Producer DAG ---")
           print(f"  Source Producer: {triggered_by_conf.get('source_producer', 
'N/A')}")
           print(f"  Producer Run ID: {triggered_by_conf.get('producer_run_id', 
'N/A')}")
           print(f"  Producer Logical Date: 
{triggered_by_conf.get('logical_date', 'N/A')}")
   
           print(f"  Data Source Info: 
{triggered_by_conf.get('data_source_info', 'N/A')}") # dataset_name
           
           #perform required operation based on dataset
   
       else:
           print("\n--- No 'conf' data passed to this triggered run ---")
           print("(This likely means it was manually triggered without conf, or 
a time-based run)")
   
       print("Consumer: Starting data processing based on received trigger 
info...")
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to