This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 722a719769 Rename `created_at` to `timestamp` in DatasetEvent (#25292)
722a719769 is described below

commit 722a7197693583e8c0fbc191cdee33f3556baa06
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Mon Jul 25 21:04:37 2022 +0100

    Rename `created_at` to `timestamp` in DatasetEvent (#25292)
    
    Timestamp seems more appropriate
---
 airflow/api_connexion/endpoints/dag_run_endpoint.py  |  6 +++---
 airflow/api_connexion/endpoints/dataset_endpoint.py  |  4 ++--
 airflow/api_connexion/openapi/v1.yaml                |  2 +-
 airflow/api_connexion/schemas/dataset_schema.py      |  2 +-
 .../versions/0114_2_4_0_add_dataset_model.py         |  4 ++--
 airflow/models/dataset.py                            |  7 ++++---
 airflow/www/static/js/datasets/Details.tsx           |  4 ++--
 airflow/www/static/js/types/api-generated.ts         |  2 +-
 .../api_connexion/endpoints/test_dag_run_endpoint.py | 20 +++++++++-----------
 .../api_connexion/endpoints/test_dataset_endpoint.py | 18 +++++++++---------
 tests/api_connexion/schemas/test_dataset_schema.py   | 10 +++++-----
 11 files changed, 39 insertions(+), 40 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py 
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index be6feb23f3..30e01eedec 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -139,15 +139,15 @@ def _get_upstream_dataset_events(*, dag_run: DagRun, 
session: Session) -> List["
 
     dataset_event_filters = [
         DatasetDagRef.dag_id == dag_run.dag_id,
-        DatasetEvent.created_at <= dag_run.execution_date,
+        DatasetEvent.timestamp <= dag_run.execution_date,
     ]
     if previous_dag_run:
-        dataset_event_filters.append(DatasetEvent.created_at > 
previous_dag_run.execution_date)
+        dataset_event_filters.append(DatasetEvent.timestamp > 
previous_dag_run.execution_date)
     dataset_events = (
         session.query(DatasetEvent)
         .join(DatasetDagRef, DatasetEvent.dataset_id == 
DatasetDagRef.dataset_id)
         .filter(*dataset_event_filters)
-        .order_by(DatasetEvent.created_at)
+        .order_by(DatasetEvent.timestamp)
         .all()
     )
     return dataset_events
diff --git a/airflow/api_connexion/endpoints/dataset_endpoint.py 
b/airflow/api_connexion/endpoints/dataset_endpoint.py
index 5c9e7606fc..0239063ca0 100644
--- a/airflow/api_connexion/endpoints/dataset_endpoint.py
+++ b/airflow/api_connexion/endpoints/dataset_endpoint.py
@@ -72,7 +72,7 @@ def get_dataset_events(
     *,
     limit: int,
     offset: int = 0,
-    order_by: str = "created_at",
+    order_by: str = "timestamp",
     dataset_id: Optional[int] = None,
     source_dag_id: Optional[str] = None,
     source_task_id: Optional[str] = None,
@@ -81,7 +81,7 @@ def get_dataset_events(
     session: Session = NEW_SESSION,
 ) -> APIResponse:
     """Get dataset events"""
-    allowed_attrs = ['source_dag_id', 'source_task_id', 'source_run_id', 
'source_map_index', 'created_at']
+    allowed_attrs = ['source_dag_id', 'source_task_id', 'source_run_id', 
'source_map_index', 'timestamp']
 
     query = session.query(DatasetEvent)
 
diff --git a/airflow/api_connexion/openapi/v1.yaml 
b/airflow/api_connexion/openapi/v1.yaml
index 7c9d45365e..55c24d871d 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -3555,7 +3555,7 @@ components:
           type: integer
           description: The task map index that updated the dataset.
           nullable: true
-        created_at:
+        timestamp:
           type: string
           description: The dataset event creation time
           nullable: false
diff --git a/airflow/api_connexion/schemas/dataset_schema.py 
b/airflow/api_connexion/schemas/dataset_schema.py
index e63f6ea7eb..06c1dc866d 100644
--- a/airflow/api_connexion/schemas/dataset_schema.py
+++ b/airflow/api_connexion/schemas/dataset_schema.py
@@ -72,7 +72,7 @@ class DatasetEventSchema(SQLAlchemySchema):
     source_dag_id = auto_field()
     source_run_id = auto_field()
     source_map_index = auto_field()
-    created_at = auto_field()
+    timestamp = auto_field()
 
 
 class DatasetEventCollection(NamedTuple):
diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py 
b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
index deb6c3c33f..8cc5d9dc2e 100644
--- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
+++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
@@ -128,10 +128,10 @@ def _create_dataset_event_table():
         sa.Column('source_dag_id', String(250), nullable=True),
         sa.Column('source_run_id', String(250), nullable=True),
         sa.Column('source_map_index', sa.Integer(), nullable=True, 
server_default='-1'),
-        sa.Column('created_at', TIMESTAMP, nullable=False),
+        sa.Column('timestamp', TIMESTAMP, nullable=False),
         sqlite_autoincrement=True,  # ensures PK values not reused
     )
-    op.create_index('idx_dataset_id_created_at', 'dataset_event', 
['dataset_id', 'created_at'])
+    op.create_index('idx_dataset_id_timestamp', 'dataset_event', 
['dataset_id', 'timestamp'])
 
 
 def upgrade():
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index c16f157f8c..21373106b5 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -211,6 +211,7 @@ class DatasetEvent(Base):
     :param source_dag_id: the dag_id of the TI which updated the dataset
     :param source_run_id: the run_id of the TI which updated the dataset
     :param source_map_index: the map_index of the TI which updated the dataset
+    :param timestamp: the time the event was logged
 
     We use relationships instead of foreign keys so that dataset events are 
not deleted even
     if the foreign key object is.
@@ -223,11 +224,11 @@ class DatasetEvent(Base):
     source_dag_id = Column(StringID(), nullable=True)
     source_run_id = Column(StringID(), nullable=True)
     source_map_index = Column(Integer, nullable=True, 
server_default=text("-1"))
-    created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
+    timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
 
     __tablename__ = "dataset_event"
     __table_args__ = (
-        Index('idx_dataset_id_created_at', dataset_id, created_at),
+        Index('idx_dataset_id_timestamp', dataset_id, timestamp),
         {'sqlite_autoincrement': True},  # ensures PK values not reused
     )
 
@@ -267,7 +268,7 @@ class DatasetEvent(Base):
 
     def __eq__(self, other) -> bool:
         if isinstance(other, self.__class__):
-            return self.dataset_id == other.dataset_id and self.created_at == 
other.created_at
+            return self.dataset_id == other.dataset_id and self.timestamp == 
other.timestamp
         else:
             return NotImplemented
 
diff --git a/airflow/www/static/js/datasets/Details.tsx 
b/airflow/www/static/js/datasets/Details.tsx
index 3bbee2a818..a99efdd42f 100644
--- a/airflow/www/static/js/datasets/Details.tsx
+++ b/airflow/www/static/js/datasets/Details.tsx
@@ -67,8 +67,8 @@ const DatasetDetails = ({ datasetId, onBack }: Props) => {
   const columns = useMemo(
     () => [
       {
-        Header: 'Created At',
-        accessor: 'createdAt',
+        Header: 'Timestamp',
+        accessor: 'timestamp',
         Cell: TimeCell,
       },
       {
diff --git a/airflow/www/static/js/types/api-generated.ts 
b/airflow/www/static/js/types/api-generated.ts
index c798721eb1..bccbd79a23 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -1499,7 +1499,7 @@ export interface components {
       /** @description The task map index that updated the dataset. */
       source_map_index?: number | null;
       /** @description The dataset event creation time */
-      created_at?: string;
+      timestamp?: string;
     };
     /**
      * @description A collection of dataset events.
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 1a25475df1..3b65450b99 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -1543,8 +1543,8 @@ def 
test__get_upstream_dataset_events_with_prior(configured_app):
     first_timestamp = pendulum.datetime(2022, 1, 1, tz='UTC')
     session.add_all(
         [
-            DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp),
-            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp),
+            DatasetEvent(dataset_id=dataset1a.id, timestamp=first_timestamp),
+            DatasetEvent(dataset_id=dataset1b.id, timestamp=first_timestamp),
         ]
     )
     dr1 = DagRun(
@@ -1557,9 +1557,9 @@ def 
test__get_upstream_dataset_events_with_prior(configured_app):
     session.add(dr1)
     session.add_all(
         [
-            DatasetEvent(dataset_id=dataset1a.id, 
created_at=first_timestamp.add(microseconds=2000)),
-            DatasetEvent(dataset_id=dataset1b.id, 
created_at=first_timestamp.add(microseconds=3000)),
-            DatasetEvent(dataset_id=dataset1b.id, 
created_at=first_timestamp.add(microseconds=4000)),
+            DatasetEvent(dataset_id=dataset1a.id, 
timestamp=first_timestamp.add(microseconds=2000)),
+            DatasetEvent(dataset_id=dataset1b.id, 
timestamp=first_timestamp.add(microseconds=3000)),
+            DatasetEvent(dataset_id=dataset1b.id, 
timestamp=first_timestamp.add(microseconds=4000)),
         ]
     )
     dr2 = DagRun(  # this dag run should be ignored
@@ -1578,15 +1578,13 @@ def 
test__get_upstream_dataset_events_with_prior(configured_app):
     )
     dr3.dag = dag2
     session.add(dr3)
-    session.add_all(
-        [DatasetEvent(dataset_id=dataset1a.id, 
created_at=first_timestamp.add(microseconds=5000))]
-    )
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id, 
timestamp=first_timestamp.add(microseconds=5000))])
     session.commit()
     session.expunge_all()
 
     events = _get_upstream_dataset_events(dag_run=dr3, session=session)
 
-    event_times = [x.created_at for x in events]
+    event_times = [x.timestamp for x in events]
     assert event_times == [
         first_timestamp.add(microseconds=2000),
         first_timestamp.add(microseconds=3000),
@@ -1612,7 +1610,7 @@ class 
TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
         assert len(result) == 1
         created_at = pendulum.now('UTC')
         # make sure whatever is returned by this func is what comes out in 
response.
-        d = DatasetEvent(dataset_id=1, created_at=created_at)
+        d = DatasetEvent(dataset_id=1, timestamp=created_at)
         d.dataset = Dataset(id=1, uri='hello', created_at=created_at, 
updated_at=created_at)
         mock_get_events.return_value = [d]
         response = self.client.get(
@@ -1623,7 +1621,7 @@ class 
TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
         expected_response = {
             'dataset_events': [
                 {
-                    'created_at': str(created_at),
+                    'timestamp': str(created_at),
                     'dataset_id': 1,
                     'dataset_uri': d.dataset.uri,
                     'extra': None,
diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py 
b/tests/api_connexion/endpoints/test_dataset_endpoint.py
index a06142dcbb..0d025d3c6c 100644
--- a/tests/api_connexion/endpoints/test_dataset_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py
@@ -269,7 +269,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
             "source_map_index": -1,
         }
 
-        events = [DatasetEvent(id=i, 
created_at=timezone.parse(self.default_time), **common) for i in [1, 2]]
+        events = [DatasetEvent(id=i, 
timestamp=timezone.parse(self.default_time), **common) for i in [1, 2]]
         session.add_all(events)
         session.commit()
         assert session.query(DatasetEvent).count() == 2
@@ -282,13 +282,13 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
             "dataset_events": [
                 {
                     "id": 1,
-                    "created_at": self.default_time,
+                    "timestamp": self.default_time,
                     **common,
                     "dataset_uri": d.uri,
                 },
                 {
                     "id": 2,
-                    "created_at": self.default_time,
+                    "timestamp": self.default_time,
                     **common,
                     "dataset_uri": d.uri,
                 },
@@ -328,7 +328,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
                 source_task_id=f"task{i}",
                 source_run_id=f"run{i}",
                 source_map_index=i,
-                created_at=timezone.parse(self.default_time),
+                timestamp=timezone.parse(self.default_time),
             )
             for i in [1, 2, 3]
         ]
@@ -353,7 +353,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
                     "source_task_id": "task2",
                     "source_run_id": "run2",
                     "source_map_index": 2,
-                    "created_at": self.default_time,
+                    "timestamp": self.default_time,
                 }
             ],
             "total_entries": 1,
@@ -369,7 +369,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
                 source_task_id="bar",
                 source_run_id="custom",
                 source_map_index=-1,
-                created_at=timezone.parse(self.default_time),
+                timestamp=timezone.parse(self.default_time),
             )
             for i in [1, 2]
         ]
@@ -425,7 +425,7 @@ class 
TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint):
                 source_task_id="bar",
                 source_run_id=f"run{i}",
                 source_map_index=-1,
-                created_at=timezone.parse(self.default_time),
+                timestamp=timezone.parse(self.default_time),
             )
             for i in range(1, 10)
         ]
@@ -447,7 +447,7 @@ class 
TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint):
                 source_task_id="bar",
                 source_run_id=f"run{i}",
                 source_map_index=-1,
-                created_at=timezone.parse(self.default_time),
+                timestamp=timezone.parse(self.default_time),
             )
             for i in range(1, 110)
         ]
@@ -469,7 +469,7 @@ class 
TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint):
                 source_task_id="bar",
                 source_run_id=f"run{i}",
                 source_map_index=-1,
-                created_at=timezone.parse(self.default_time),
+                timestamp=timezone.parse(self.default_time),
             )
             for i in range(1, 200)
         ]
diff --git a/tests/api_connexion/schemas/test_dataset_schema.py 
b/tests/api_connexion/schemas/test_dataset_schema.py
index f6ed25b85c..46e2732f2e 100644
--- a/tests/api_connexion/schemas/test_dataset_schema.py
+++ b/tests/api_connexion/schemas/test_dataset_schema.py
@@ -111,7 +111,7 @@ class TestDatasetEventSchema(TestDatasetSchemaBase):
             source_task_id="bar",
             source_run_id="custom",
             source_map_index=-1,
-            created_at=timezone.parse(self.timestamp),
+            timestamp=timezone.parse(self.timestamp),
         )
         session.add(event)
         session.flush()
@@ -125,7 +125,7 @@ class TestDatasetEventSchema(TestDatasetSchemaBase):
             "source_task_id": "bar",
             "source_run_id": "custom",
             "source_map_index": -1,
-            "created_at": self.timestamp,
+            "timestamp": self.timestamp,
         }
 
 
@@ -140,7 +140,7 @@ class 
TestDatasetEventCollectionSchema(TestDatasetSchemaBase):
             "source_map_index": -1,
         }
 
-        events = [DatasetEvent(id=i, 
created_at=timezone.parse(self.timestamp), **common) for i in [1, 2]]
+        events = [DatasetEvent(id=i, timestamp=timezone.parse(self.timestamp), 
**common) for i in [1, 2]]
         session.add_all(events)
         session.flush()
         serialized_data = dataset_event_collection_schema.dump(
@@ -148,8 +148,8 @@ class 
TestDatasetEventCollectionSchema(TestDatasetSchemaBase):
         )
         assert serialized_data == {
             "dataset_events": [
-                {"id": 1, "created_at": self.timestamp, **common},
-                {"id": 2, "created_at": self.timestamp, **common},
+                {"id": 1, "timestamp": self.timestamp, **common},
+                {"id": 2, "timestamp": self.timestamp, **common},
             ],
             "total_entries": 2,
         }

Reply via email to