This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f3b44c6813a00f43b307e8d55739419905dca1f7
Author: fritz-astronomer <[email protected]>
AuthorDate: Sat Sep 16 13:07:05 2023 -0400

    Docs for triggered_dataset_event (#34410)
    
    * add templates reference for triggering_dataset_events and a note to check 
the templates page on the datasets page
    
    * add working example, correct type of triggering_dataset_events
    
    * explain | first | first
    
    (cherry picked from commit 21610c1d6763e61108fae35585536a5c409d5cbc)
---
 .../authoring-and-scheduling/datasets.rst          | 39 ++++++++++++++++++++++
 docs/apache-airflow/templates-ref.rst              |  4 +++
 docs/spelling_wordlist.txt                         |  2 ++
 3 files changed, 45 insertions(+)

diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst 
b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
index 605635a4e1..60268bedff 100644
--- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
@@ -197,3 +197,42 @@ Notes on schedules
 The ``schedule`` parameter to your DAG can take either a list of datasets to 
consume or a timetable-based option. The two cannot currently be mixed.
 
 When using datasets, in this first release (v2.4) waiting for all datasets in 
the list to be updated is the only option when multiple datasets are consumed 
by a DAG. A later release may introduce more fine-grained options allowing for 
greater flexibility.
+
+Fetching information from a Triggering Dataset Event
+----------------------------------------------------
+
+A triggered DAG can fetch information from the Dataset that triggered it using 
the ``triggering_dataset_events`` template or parameter.
+See more at :ref:`templates-ref`.
+
+Example:
+
+.. code-block:: python
+
+    example_snowflake_dataset = Dataset("snowflake://my_db.my_schema.my_table")
+
+    with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
+        SQLExecuteQueryOperator(
+            task_id="load", conn_id="snowflake_default", 
outlets=[example_snowflake_dataset], ...
+        )
+
+    with DAG(dag_id="query_snowflake_data", 
schedule=[example_snowflake_dataset], ...):
+        SQLExecuteQueryOperator(
+            task_id="query",
+            conn_id="snowflake_default",
+            sql="""
+              SELECT *
+              FROM my_db.my_schema.my_table
+              WHERE "updated_at" >= '{{ (triggering_dataset_events.values() | 
first | first).source_dag_run.data_interval_start }}'
+              AND "updated_at" < '{{ (triggering_dataset_events.values() | 
first | first).source_dag_run.data_interval_end }}';
+            """,
+        )
+
+        @task
+        def print_triggering_dataset_events(triggering_dataset_events=None):
+            for dataset, dataset_list in triggering_dataset_events.items():
+                print(dataset, dataset_list, dataset_list[dataset])
+                print(dataset_list[dataset][0].source_dag_run.dag_run_id)
+
+        print_triggering_dataset_events()
+
+Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
diff --git a/docs/apache-airflow/templates-ref.rst 
b/docs/apache-airflow/templates-ref.rst
index bfb3cad9eb..3cb2f4a93e 100644
--- a/docs/apache-airflow/templates-ref.rst
+++ b/docs/apache-airflow/templates-ref.rst
@@ -74,6 +74,10 @@ Variable                                    Type             
     Description
 ``{{ expanded_ti_count }}``                 int | ``None``        | Number of 
task instances that a mapped task was expanded into. If
                                                                   | the 
current task is not mapped, this should be ``None``.
                                                                   | Added in 
version 2.5.
+``{{ triggering_dataset_events }}``         dict[str,             | If in a 
Dataset Scheduled DAG, a map of Dataset URI to a list of triggering 
:class:`~airflow.models.dataset.DatasetEvent`
+                                            list[DatasetEvent]]   | (there may 
be more than one, if there are multiple Datasets with different frequencies).
+                                                                  | Read more 
here :doc:`Datasets <authoring-and-scheduling/datasets>`.
+                                                                  | Added in 
version 2.4.
 =========================================== ===================== 
===================================================================
 
 .. note::
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index fbca728f8e..f8f613dfc3 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -375,6 +375,8 @@ Dataproc
 dataproc
 Dataset
 dataset
+DatasetEvent
+DatasetEvents
 datasetId
 Datasets
 datasets

Reply via email to