This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f3b44c6813a00f43b307e8d55739419905dca1f7 Author: fritz-astronomer <[email protected]> AuthorDate: Sat Sep 16 13:07:05 2023 -0400 Docs for triggered_dataset_event (#34410) * add templates reference for triggering_dataset_events and a note to check the templates page on the datasets page * add working example, correct type of triggering_dataset_events * explain | first | first (cherry picked from commit 21610c1d6763e61108fae35585536a5c409d5cbc) --- .../authoring-and-scheduling/datasets.rst | 39 ++++++++++++++++++++++ docs/apache-airflow/templates-ref.rst | 4 +++ docs/spelling_wordlist.txt | 2 ++ 3 files changed, 45 insertions(+) diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst b/docs/apache-airflow/authoring-and-scheduling/datasets.rst index 605635a4e1..60268bedff 100644 --- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst +++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst @@ -197,3 +197,42 @@ Notes on schedules The ``schedule`` parameter to your DAG can take either a list of datasets to consume or a timetable-based option. The two cannot currently be mixed. When using datasets, in this first release (v2.4) waiting for all datasets in the list to be updated is the only option when multiple datasets are consumed by a DAG. A later release may introduce more fine-grained options allowing for greater flexibility. + +Fetching information from a Triggering Dataset Event +---------------------------------------------------- + +A triggered DAG can fetch information from the Dataset that triggered it using the ``triggering_dataset_events`` template or parameter. +See more at :ref:`templates-ref`. + +Example: + +.. code-block:: python + + example_snowflake_dataset = Dataset("snowflake://my_db.my_schema.my_table") + + with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...): + SQLExecuteQueryOperator( + task_id="load", conn_id="snowflake_default", outlets=[example_snowflake_dataset], ... + ) + + with DAG(dag_id="query_snowflake_data", schedule=[example_snowflake_dataset], ...): + SQLExecuteQueryOperator( + task_id="query", + conn_id="snowflake_default", + sql=""" + SELECT * + FROM my_db.my_schema.my_table + WHERE "updated_at" >= '{{ (triggering_dataset_events.values() | first | first).source_dag_run.data_interval_start }}' + AND "updated_at" < '{{ (triggering_dataset_events.values() | first | first).source_dag_run.data_interval_end }}'; + """, + ) + + @task + def print_triggering_dataset_events(triggering_dataset_events=None): + for dataset, dataset_list in triggering_dataset_events.items(): + print(dataset, dataset_list, dataset_list[dataset]) + print(dataset_list[dataset][0].source_dag_run.dag_run_id) + + print_triggering_dataset_events() + +Note that this example is using `(.values() | first | first) <https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ to fetch the first of one Dataset given to the DAG, and the first of one DatasetEvent for that Dataset. An implementation may be quite complex if you have multiple Datasets, potentially with multiple DatasetEvents. diff --git a/docs/apache-airflow/templates-ref.rst b/docs/apache-airflow/templates-ref.rst index bfb3cad9eb..3cb2f4a93e 100644 --- a/docs/apache-airflow/templates-ref.rst +++ b/docs/apache-airflow/templates-ref.rst @@ -74,6 +74,10 @@ Variable Type Description ``{{ expanded_ti_count }}`` int | ``None`` | Number of task instances that a mapped task was expanded into. If | the current task is not mapped, this should be ``None``. | Added in version 2.5. +``{{ triggering_dataset_events }}`` dict[str, | If in a Dataset Scheduled DAG, a map of Dataset URI to a list of triggering :class:`~airflow.models.dataset.DatasetEvent` + list[DatasetEvent]] | (there may be more than one, if there are multiple Datasets with different frequencies). + | Read more here :doc:`Datasets <authoring-and-scheduling/datasets>`. + | Added in version 2.4. =========================================== ===================== =================================================================== .. note:: diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index fbca728f8e..f8f613dfc3 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -375,6 +375,8 @@ Dataproc dataproc Dataset dataset +DatasetEvent +DatasetEvents datasetId Datasets datasets
