This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 44c37e4eeefcc62adb9a19ab101f7630f36f5de6
Author: Tzu-ping Chung <t...@astronomer.io>
AuthorDate: Thu Oct 21 01:26:08 2021 +0800

    Change `ds`, `ts`, etc. back to use logical date (#19088)
    
    (cherry picked from commit 25a50bb1fbf0e228706c7927cb36570921881adb)
---
 airflow/models/taskinstance.py        | 43 +++++++++++++++++-----------
 docs/apache-airflow/templates-ref.rst | 20 +++++++------
 tests/utils/log/test_log_reader.py    | 54 ++++++++++++++++++++++++++++++++++-
 3 files changed, 91 insertions(+), 26 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 46945c7..71ab107 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -27,7 +27,7 @@ from collections import defaultdict
 from datetime import datetime, timedelta
 from functools import partial
 from tempfile import NamedTemporaryFile
-from typing import IO, TYPE_CHECKING, Any, Dict, Iterable, List, NamedTuple, 
Optional, Tuple, Union
+from typing import IO, TYPE_CHECKING, Any, Callable, Dict, Iterable, List, 
NamedTuple, Optional, Tuple, Union
 from urllib.parse import quote
 
 import dill
@@ -1788,17 +1788,16 @@ class TaskInstance(Base, LoggingMixin):
         if conf.getboolean('core', 'dag_run_conf_overrides_params'):
             self.overwrite_params_with_dag_run_conf(params=params, 
dag_run=dag_run)
 
-        interval_start = dag.get_run_data_interval(dag_run).start
-        ds = interval_start.strftime('%Y-%m-%d')
+        logical_date = timezone.coerce_datetime(self.execution_date)
+        ds = logical_date.strftime('%Y-%m-%d')
+        ds_nodash = ds.replace('-', '')
+        ts = logical_date.isoformat()
+        ts_nodash = logical_date.strftime('%Y%m%dT%H%M%S')
+        ts_nodash_with_tz = ts.replace('-', '').replace(':', '')
 
         # Now validates Params and convert them into a simple dict
         task.params = params.validate()
 
-        ds_nodash = ds.replace('-', '')
-        ts = interval_start.isoformat()
-        ts_nodash = interval_start.strftime('%Y%m%dT%H%M%S')
-        ts_nodash_with_tz = ts.replace('-', '').replace(':', '')
-
         @cache  # Prevent multiple database access.
         def _get_previous_dagrun_success() -> Optional["DagRun"]:
             return self.get_previous_dagrun(state=State.SUCCESS, 
session=session)
@@ -1910,14 +1909,23 @@ class TaskInstance(Base, LoggingMixin):
 
         # Create lazy proxies for deprecated stuff.
 
-        def deprecated_proxy(func, *, key, replacement=None) -> 
lazy_object_proxy.Proxy:
+        def deprecated_proxy(
+            func: Callable[[], Any],
+            *,
+            key: str,
+            replacements: Optional[List[str]] = None,
+        ) -> lazy_object_proxy.Proxy:
             def deprecated_func():
                 message = (
                     f"Accessing {key!r} from the template is deprecated and "
                     f"will be removed in a future version."
                 )
-                if replacement:
-                    message += f" Please use {replacement!r} instead."
+                if replacements:
+                    display_except_last = ", ".join(repr(r) for r in 
replacements[:-1])
+                    if display_except_last:
+                        message += f" Please use {display_except_last} or 
{replacements[-1]!r} instead."
+                    else:
+                        message += f" Please use {replacements[-1]!r} instead."
                 warnings.warn(message, DeprecationWarning)
                 return func()
 
@@ -1995,22 +2003,23 @@ class TaskInstance(Base, LoggingMixin):
             'ds': ds,
             'ds_nodash': ds_nodash,
             'execution_date': deprecated_proxy(
-                lambda: timezone.coerce_datetime(self.execution_date),
+                lambda: logical_date,
                 key='execution_date',
-                replacement='data_interval_start',
+                replacements=['logical_date', 'data_interval_start'],
             ),
             'inlets': task.inlets,
+            'logical_date': logical_date,
             'macros': macros,
-            'next_ds': deprecated_proxy(get_next_ds, key="next_ds", 
replacement="data_interval_end | ds"),
+            'next_ds': deprecated_proxy(get_next_ds, key="next_ds", 
replacements=["data_interval_end | ds"]),
             'next_ds_nodash': deprecated_proxy(
                 get_next_ds_nodash,
                 key="next_ds_nodash",
-                replacement="data_interval_end | ds_nodash",
+                replacements=["data_interval_end | ds_nodash"],
             ),
             'next_execution_date': deprecated_proxy(
                 get_next_execution_date,
                 key='next_execution_date',
-                replacement='data_interval_end',
+                replacements=['data_interval_end'],
             ),
             'outlets': task.outlets,
             'params': task.params,
@@ -2022,7 +2031,7 @@ class TaskInstance(Base, LoggingMixin):
             'prev_execution_date_success': deprecated_proxy(
                 lambda: self.get_previous_execution_date(state=State.SUCCESS, 
session=session),
                 key='prev_execution_date_success',
-                replacement='prev_data_interval_start_success',
+                replacements=['prev_data_interval_start_success'],
             ),
             'prev_start_date_success': 
lazy_object_proxy.Proxy(get_prev_start_date_success),
             'run_id': self.run_id,
diff --git a/docs/apache-airflow/templates-ref.rst 
b/docs/apache-airflow/templates-ref.rst
index bc97315..a74c0aa 100644
--- a/docs/apache-airflow/templates-ref.rst
+++ b/docs/apache-airflow/templates-ref.rst
@@ -38,15 +38,14 @@ Variable                                    Description
 ==========================================  
====================================
 ``{{ data_interval_start }}``               Start of the data interval 
(`pendulum.DateTime`_).
 ``{{ data_interval_end }}``                 End of the data interval 
(`pendulum.DateTime`_).
-``{{ ds }}``                                Start of the data interval as 
``YYYY-MM-DD``.
-                                            Same as ``{{ data_interval_start | 
ds }}``.
-``{{ ds_nodash }}``                         Start of the data interval as 
``YYYYMMDD``.
-                                            Same as ``{{ data_interval_start | 
ds_nodash }}``.
-``{{ ts }}``                                Same as ``{{ data_interval_start | 
ts }}``.
+``{{ ds }}``                                The DAG run's logical date as 
``YYYY-MM-DD``.
+                                            Same as ``{{ dag_run.logical_date 
| ds }}``.
+``{{ ds_nodash }}``                         Same as ``{{ dag_run.logical_date 
| ds_nodash }}``.
+``{{ ts }}``                                Same as ``{{ dag_run.logical_date 
| ts }}``.
                                             Example: 
``2018-01-01T00:00:00+00:00``.
-``{{ ts_nodash_with_tz }}``                 Same as ``{{ data_interval_start | 
ts_nodash_with_tz }}``.
+``{{ ts_nodash_with_tz }}``                 Same as ``{{ dag_run.logical_date 
| ts_nodash_with_tz }}``.
                                             Example: ``20180101T000000+0000``.
-``{{ ts_nodash }}``                         Same as ``{{ data_interval_start | 
ts_nodash }}``.
+``{{ ts_nodash }}``                         Same as ``{{ dag_run.logical_date 
| ts_nodash }}``.
                                             Example: ``20180101T000000``.
 ``{{ prev_data_interval_start_success }}``  Start of the data interval from 
prior successful DAG run
                                             (`pendulum.DateTime`_ or ``None``).
@@ -78,13 +77,18 @@ Variable                                    Description
                                             subcommand.
 ==========================================  
====================================
 
+.. note::
+
+    The DAG run's logical date, and values derived from it, such as ``ds`` and
+    ``ts``, **should not** be considered unique in a DAG. Use ``run_id`` 
instead.
+
 The following variables are deprecated. They are kept for backward 
compatibility, but you should convert
 existing code to use other variables instead.
 
 =====================================   ====================================
 Deprecated Variable                     Description
 =====================================   ====================================
-``{{ execution_date }}``                the execution date (logical date), 
same as ``logical_date``
+``{{ execution_date }}``                the execution date (logical date), 
same as ``dag_run.logical_date``
 ``{{ next_execution_date }}``           the next execution date (if available) 
(`pendulum.DateTime`_)
                                         if ``{{ execution_date }}`` is 
``2018-01-01 00:00:00`` and
                                         ``schedule_interval`` is ``@weekly``, 
``{{ next_execution_date }}``
diff --git a/tests/utils/log/test_log_reader.py 
b/tests/utils/log/test_log_reader.py
index f891b2f..a3bb9b7 100644
--- a/tests/utils/log/test_log_reader.py
+++ b/tests/utils/log/test_log_reader.py
@@ -16,16 +16,21 @@
 # under the License.
 
 import copy
+import datetime
 import logging
 import os
 import sys
 import tempfile
 from unittest import mock
 
+import pendulum
 import pytest
 
 from airflow import settings
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun
+from airflow.operators.python import PythonOperator
+from airflow.timetables.base import DataInterval
 from airflow.utils import timezone
 from airflow.utils.log.log_reader import TaskLogReader
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin
@@ -87,7 +92,7 @@ class TestLogView:
                 f.flush()
 
     @pytest.fixture(autouse=True)
-    def prepare_db(self, session, create_task_instance):
+    def prepare_db(self, create_task_instance):
         ti = create_task_instance(
             dag_id=self.DAG_ID,
             task_id=self.TASK_ID,
@@ -238,3 +243,50 @@ class TestLogView:
 
         mock_prop.return_value = True
         assert task_log_reader.supports_external_link
+
+    def test_task_log_filename_unique(self, dag_maker):
+        """Ensure the default log_filename_template produces a unique filename.
+
+        See discussion in apache/airflow#19058 [1]_ for how uniqueness may
+        change in a future Airflow release. For now, the logical date is used
+        to distinguish DAG runs. This test should be modified when the logical
+        date is no longer used to ensure uniqueness.
+
+        [1]: https://github.com/apache/airflow/issues/19058
+        """
+        dag_id = "test_task_log_filename_ts_corresponds_to_logical_date"
+        task_id = "echo_run_type"
+
+        def echo_run_type(dag_run: DagRun, **kwargs):
+            print(dag_run.run_type)
+
+        with dag_maker(dag_id, start_date=self.DEFAULT_DATE, 
schedule_interval="@daily") as dag:
+            PythonOperator(task_id=task_id, python_callable=echo_run_type)
+
+        start = pendulum.datetime(2021, 1, 1)
+        end = start + datetime.timedelta(days=1)
+        trigger_time = end + datetime.timedelta(hours=4, minutes=29)  # 
Arbitrary.
+
+        # Create two DAG runs that have the same data interval, but not the 
same
+        # execution date, to check if they correctly use different log files.
+        scheduled_dagrun: DagRun = dag_maker.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=start,
+            data_interval=DataInterval(start, end),
+        )
+        manual_dagrun: DagRun = dag_maker.create_dagrun(
+            run_type=DagRunType.MANUAL,
+            execution_date=trigger_time,
+            data_interval=DataInterval(start, end),
+        )
+
+        scheduled_ti = scheduled_dagrun.get_task_instance(task_id)
+        manual_ti = manual_dagrun.get_task_instance(task_id)
+        assert scheduled_ti is not None
+        assert manual_ti is not None
+
+        scheduled_ti.refresh_from_task(dag.get_task(task_id))
+        manual_ti.refresh_from_task(dag.get_task(task_id))
+
+        reader = TaskLogReader()
+        assert reader.render_log_filename(scheduled_ti, 1) != 
reader.render_log_filename(manual_ti, 1)

Reply via email to