This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 44c37e4eeefcc62adb9a19ab101f7630f36f5de6 Author: Tzu-ping Chung <t...@astronomer.io> AuthorDate: Thu Oct 21 01:26:08 2021 +0800 Change `ds`, `ts`, etc. back to use logical date (#19088) (cherry picked from commit 25a50bb1fbf0e228706c7927cb36570921881adb) --- airflow/models/taskinstance.py | 43 +++++++++++++++++----------- docs/apache-airflow/templates-ref.rst | 20 +++++++------ tests/utils/log/test_log_reader.py | 54 ++++++++++++++++++++++++++++++++++- 3 files changed, 91 insertions(+), 26 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 46945c7..71ab107 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -27,7 +27,7 @@ from collections import defaultdict from datetime import datetime, timedelta from functools import partial from tempfile import NamedTemporaryFile -from typing import IO, TYPE_CHECKING, Any, Dict, Iterable, List, NamedTuple, Optional, Tuple, Union +from typing import IO, TYPE_CHECKING, Any, Callable, Dict, Iterable, List, NamedTuple, Optional, Tuple, Union from urllib.parse import quote import dill @@ -1788,17 +1788,16 @@ class TaskInstance(Base, LoggingMixin): if conf.getboolean('core', 'dag_run_conf_overrides_params'): self.overwrite_params_with_dag_run_conf(params=params, dag_run=dag_run) - interval_start = dag.get_run_data_interval(dag_run).start - ds = interval_start.strftime('%Y-%m-%d') + logical_date = timezone.coerce_datetime(self.execution_date) + ds = logical_date.strftime('%Y-%m-%d') + ds_nodash = ds.replace('-', '') + ts = logical_date.isoformat() + ts_nodash = logical_date.strftime('%Y%m%dT%H%M%S') + ts_nodash_with_tz = ts.replace('-', '').replace(':', '') # Now validates Params and convert them into a simple dict task.params = params.validate() - ds_nodash = ds.replace('-', '') - ts = interval_start.isoformat() - ts_nodash = interval_start.strftime('%Y%m%dT%H%M%S') - ts_nodash_with_tz = ts.replace('-', '').replace(':', '') - @cache # Prevent multiple database access. def _get_previous_dagrun_success() -> Optional["DagRun"]: return self.get_previous_dagrun(state=State.SUCCESS, session=session) @@ -1910,14 +1909,23 @@ class TaskInstance(Base, LoggingMixin): # Create lazy proxies for deprecated stuff. - def deprecated_proxy(func, *, key, replacement=None) -> lazy_object_proxy.Proxy: + def deprecated_proxy( + func: Callable[[], Any], + *, + key: str, + replacements: Optional[List[str]] = None, + ) -> lazy_object_proxy.Proxy: def deprecated_func(): message = ( f"Accessing {key!r} from the template is deprecated and " f"will be removed in a future version." ) - if replacement: - message += f" Please use {replacement!r} instead." + if replacements: + display_except_last = ", ".join(repr(r) for r in replacements[:-1]) + if display_except_last: + message += f" Please use {display_except_last} or {replacements[-1]!r} instead." + else: + message += f" Please use {replacements[-1]!r} instead." warnings.warn(message, DeprecationWarning) return func() @@ -1995,22 +2003,23 @@ class TaskInstance(Base, LoggingMixin): 'ds': ds, 'ds_nodash': ds_nodash, 'execution_date': deprecated_proxy( - lambda: timezone.coerce_datetime(self.execution_date), + lambda: logical_date, key='execution_date', - replacement='data_interval_start', + replacements=['logical_date', 'data_interval_start'], ), 'inlets': task.inlets, + 'logical_date': logical_date, 'macros': macros, - 'next_ds': deprecated_proxy(get_next_ds, key="next_ds", replacement="data_interval_end | ds"), + 'next_ds': deprecated_proxy(get_next_ds, key="next_ds", replacements=["data_interval_end | ds"]), 'next_ds_nodash': deprecated_proxy( get_next_ds_nodash, key="next_ds_nodash", - replacement="data_interval_end | ds_nodash", + replacements=["data_interval_end | ds_nodash"], ), 'next_execution_date': deprecated_proxy( get_next_execution_date, key='next_execution_date', - replacement='data_interval_end', + replacements=['data_interval_end'], ), 'outlets': task.outlets, 'params': task.params, @@ -2022,7 +2031,7 @@ class TaskInstance(Base, LoggingMixin): 'prev_execution_date_success': deprecated_proxy( lambda: self.get_previous_execution_date(state=State.SUCCESS, session=session), key='prev_execution_date_success', - replacement='prev_data_interval_start_success', + replacements=['prev_data_interval_start_success'], ), 'prev_start_date_success': lazy_object_proxy.Proxy(get_prev_start_date_success), 'run_id': self.run_id, diff --git a/docs/apache-airflow/templates-ref.rst b/docs/apache-airflow/templates-ref.rst index bc97315..a74c0aa 100644 --- a/docs/apache-airflow/templates-ref.rst +++ b/docs/apache-airflow/templates-ref.rst @@ -38,15 +38,14 @@ Variable Description ========================================== ==================================== ``{{ data_interval_start }}`` Start of the data interval (`pendulum.DateTime`_). ``{{ data_interval_end }}`` End of the data interval (`pendulum.DateTime`_). -``{{ ds }}`` Start of the data interval as ``YYYY-MM-DD``. - Same as ``{{ data_interval_start | ds }}``. -``{{ ds_nodash }}`` Start of the data interval as ``YYYYMMDD``. - Same as ``{{ data_interval_start | ds_nodash }}``. -``{{ ts }}`` Same as ``{{ data_interval_start | ts }}``. +``{{ ds }}`` The DAG run's logical date as ``YYYY-MM-DD``. + Same as ``{{ dag_run.logical_date | ds }}``. +``{{ ds_nodash }}`` Same as ``{{ dag_run.logical_date | ds_nodash }}``. +``{{ ts }}`` Same as ``{{ dag_run.logical_date | ts }}``. Example: ``2018-01-01T00:00:00+00:00``. -``{{ ts_nodash_with_tz }}`` Same as ``{{ data_interval_start | ts_nodash_with_tz }}``. +``{{ ts_nodash_with_tz }}`` Same as ``{{ dag_run.logical_date | ts_nodash_with_tz }}``. Example: ``20180101T000000+0000``. -``{{ ts_nodash }}`` Same as ``{{ data_interval_start | ts_nodash }}``. +``{{ ts_nodash }}`` Same as ``{{ dag_run.logical_date | ts_nodash }}``. Example: ``20180101T000000``. ``{{ prev_data_interval_start_success }}`` Start of the data interval from prior successful DAG run (`pendulum.DateTime`_ or ``None``). @@ -78,13 +77,18 @@ Variable Description subcommand. ========================================== ==================================== +.. note:: + + The DAG run's logical date, and values derived from it, such as ``ds`` and + ``ts``, **should not** be considered unique in a DAG. Use ``run_id`` instead. + The following variables are deprecated. They are kept for backward compatibility, but you should convert existing code to use other variables instead. ===================================== ==================================== Deprecated Variable Description ===================================== ==================================== -``{{ execution_date }}`` the execution date (logical date), same as ``logical_date`` +``{{ execution_date }}`` the execution date (logical date), same as ``dag_run.logical_date`` ``{{ next_execution_date }}`` the next execution date (if available) (`pendulum.DateTime`_) if ``{{ execution_date }}`` is ``2018-01-01 00:00:00`` and ``schedule_interval`` is ``@weekly``, ``{{ next_execution_date }}`` diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index f891b2f..a3bb9b7 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -16,16 +16,21 @@ # under the License. import copy +import datetime import logging import os import sys import tempfile from unittest import mock +import pendulum import pytest from airflow import settings from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG +from airflow.models import DagRun +from airflow.operators.python import PythonOperator +from airflow.timetables.base import DataInterval from airflow.utils import timezone from airflow.utils.log.log_reader import TaskLogReader from airflow.utils.log.logging_mixin import ExternalLoggingMixin @@ -87,7 +92,7 @@ class TestLogView: f.flush() @pytest.fixture(autouse=True) - def prepare_db(self, session, create_task_instance): + def prepare_db(self, create_task_instance): ti = create_task_instance( dag_id=self.DAG_ID, task_id=self.TASK_ID, @@ -238,3 +243,50 @@ class TestLogView: mock_prop.return_value = True assert task_log_reader.supports_external_link + + def test_task_log_filename_unique(self, dag_maker): + """Ensure the default log_filename_template produces a unique filename. + + See discussion in apache/airflow#19058 [1]_ for how uniqueness may + change in a future Airflow release. For now, the logical date is used + to distinguish DAG runs. This test should be modified when the logical + date is no longer used to ensure uniqueness. + + [1]: https://github.com/apache/airflow/issues/19058 + """ + dag_id = "test_task_log_filename_ts_corresponds_to_logical_date" + task_id = "echo_run_type" + + def echo_run_type(dag_run: DagRun, **kwargs): + print(dag_run.run_type) + + with dag_maker(dag_id, start_date=self.DEFAULT_DATE, schedule_interval="@daily") as dag: + PythonOperator(task_id=task_id, python_callable=echo_run_type) + + start = pendulum.datetime(2021, 1, 1) + end = start + datetime.timedelta(days=1) + trigger_time = end + datetime.timedelta(hours=4, minutes=29) # Arbitrary. + + # Create two DAG runs that have the same data interval, but not the same + # execution date, to check if they correctly use different log files. + scheduled_dagrun: DagRun = dag_maker.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=start, + data_interval=DataInterval(start, end), + ) + manual_dagrun: DagRun = dag_maker.create_dagrun( + run_type=DagRunType.MANUAL, + execution_date=trigger_time, + data_interval=DataInterval(start, end), + ) + + scheduled_ti = scheduled_dagrun.get_task_instance(task_id) + manual_ti = manual_dagrun.get_task_instance(task_id) + assert scheduled_ti is not None + assert manual_ti is not None + + scheduled_ti.refresh_from_task(dag.get_task(task_id)) + manual_ti.refresh_from_task(dag.get_task(task_id)) + + reader = TaskLogReader() + assert reader.render_log_filename(scheduled_ti, 1) != reader.render_log_filename(manual_ti, 1)