This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 722a719769 Rename `created_at` to `timestamp` in DatasetEvent (#25292) 722a719769 is described below commit 722a7197693583e8c0fbc191cdee33f3556baa06 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Mon Jul 25 21:04:37 2022 +0100 Rename `created_at` to `timestamp` in DatasetEvent (#25292) Timestamp seems more appropriate --- airflow/api_connexion/endpoints/dag_run_endpoint.py | 6 +++--- airflow/api_connexion/endpoints/dataset_endpoint.py | 4 ++-- airflow/api_connexion/openapi/v1.yaml | 2 +- airflow/api_connexion/schemas/dataset_schema.py | 2 +- .../versions/0114_2_4_0_add_dataset_model.py | 4 ++-- airflow/models/dataset.py | 7 ++++--- airflow/www/static/js/datasets/Details.tsx | 4 ++-- airflow/www/static/js/types/api-generated.ts | 2 +- .../api_connexion/endpoints/test_dag_run_endpoint.py | 20 +++++++++----------- .../api_connexion/endpoints/test_dataset_endpoint.py | 18 +++++++++--------- tests/api_connexion/schemas/test_dataset_schema.py | 10 +++++----- 11 files changed, 39 insertions(+), 40 deletions(-) diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index be6feb23f3..30e01eedec 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -139,15 +139,15 @@ def _get_upstream_dataset_events(*, dag_run: DagRun, session: Session) -> List[" dataset_event_filters = [ DatasetDagRef.dag_id == dag_run.dag_id, - DatasetEvent.created_at <= dag_run.execution_date, + DatasetEvent.timestamp <= dag_run.execution_date, ] if previous_dag_run: - dataset_event_filters.append(DatasetEvent.created_at > previous_dag_run.execution_date) + dataset_event_filters.append(DatasetEvent.timestamp > previous_dag_run.execution_date) dataset_events = ( session.query(DatasetEvent) .join(DatasetDagRef, DatasetEvent.dataset_id == DatasetDagRef.dataset_id) .filter(*dataset_event_filters) - .order_by(DatasetEvent.created_at) + .order_by(DatasetEvent.timestamp) .all() ) return dataset_events diff --git a/airflow/api_connexion/endpoints/dataset_endpoint.py b/airflow/api_connexion/endpoints/dataset_endpoint.py index 5c9e7606fc..0239063ca0 100644 --- a/airflow/api_connexion/endpoints/dataset_endpoint.py +++ b/airflow/api_connexion/endpoints/dataset_endpoint.py @@ -72,7 +72,7 @@ def get_dataset_events( *, limit: int, offset: int = 0, - order_by: str = "created_at", + order_by: str = "timestamp", dataset_id: Optional[int] = None, source_dag_id: Optional[str] = None, source_task_id: Optional[str] = None, @@ -81,7 +81,7 @@ def get_dataset_events( session: Session = NEW_SESSION, ) -> APIResponse: """Get dataset events""" - allowed_attrs = ['source_dag_id', 'source_task_id', 'source_run_id', 'source_map_index', 'created_at'] + allowed_attrs = ['source_dag_id', 'source_task_id', 'source_run_id', 'source_map_index', 'timestamp'] query = session.query(DatasetEvent) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 7c9d45365e..55c24d871d 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -3555,7 +3555,7 @@ components: type: integer description: The task map index that updated the dataset. nullable: true - created_at: + timestamp: type: string description: The dataset event creation time nullable: false diff --git a/airflow/api_connexion/schemas/dataset_schema.py b/airflow/api_connexion/schemas/dataset_schema.py index e63f6ea7eb..06c1dc866d 100644 --- a/airflow/api_connexion/schemas/dataset_schema.py +++ b/airflow/api_connexion/schemas/dataset_schema.py @@ -72,7 +72,7 @@ class DatasetEventSchema(SQLAlchemySchema): source_dag_id = auto_field() source_run_id = auto_field() source_map_index = auto_field() - created_at = auto_field() + timestamp = auto_field() class DatasetEventCollection(NamedTuple): diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py index deb6c3c33f..8cc5d9dc2e 100644 --- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -128,10 +128,10 @@ def _create_dataset_event_table(): sa.Column('source_dag_id', String(250), nullable=True), sa.Column('source_run_id', String(250), nullable=True), sa.Column('source_map_index', sa.Integer(), nullable=True, server_default='-1'), - sa.Column('created_at', TIMESTAMP, nullable=False), + sa.Column('timestamp', TIMESTAMP, nullable=False), sqlite_autoincrement=True, # ensures PK values not reused ) - op.create_index('idx_dataset_id_created_at', 'dataset_event', ['dataset_id', 'created_at']) + op.create_index('idx_dataset_id_timestamp', 'dataset_event', ['dataset_id', 'timestamp']) def upgrade(): diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index c16f157f8c..21373106b5 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -211,6 +211,7 @@ class DatasetEvent(Base): :param source_dag_id: the dag_id of the TI which updated the dataset :param source_run_id: the run_id of the TI which updated the dataset :param source_map_index: the map_index of the TI which updated the dataset + :param timestamp: the time the event was logged We use relationships instead of foreign keys so that dataset events are not deleted even if the foreign key object is. @@ -223,11 +224,11 @@ class DatasetEvent(Base): source_dag_id = Column(StringID(), nullable=True) source_run_id = Column(StringID(), nullable=True) source_map_index = Column(Integer, nullable=True, server_default=text("-1")) - created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False) __tablename__ = "dataset_event" __table_args__ = ( - Index('idx_dataset_id_created_at', dataset_id, created_at), + Index('idx_dataset_id_timestamp', dataset_id, timestamp), {'sqlite_autoincrement': True}, # ensures PK values not reused ) @@ -267,7 +268,7 @@ class DatasetEvent(Base): def __eq__(self, other) -> bool: if isinstance(other, self.__class__): - return self.dataset_id == other.dataset_id and self.created_at == other.created_at + return self.dataset_id == other.dataset_id and self.timestamp == other.timestamp else: return NotImplemented diff --git a/airflow/www/static/js/datasets/Details.tsx b/airflow/www/static/js/datasets/Details.tsx index 3bbee2a818..a99efdd42f 100644 --- a/airflow/www/static/js/datasets/Details.tsx +++ b/airflow/www/static/js/datasets/Details.tsx @@ -67,8 +67,8 @@ const DatasetDetails = ({ datasetId, onBack }: Props) => { const columns = useMemo( () => [ { - Header: 'Created At', - accessor: 'createdAt', + Header: 'Timestamp', + accessor: 'timestamp', Cell: TimeCell, }, { diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index c798721eb1..bccbd79a23 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1499,7 +1499,7 @@ export interface components { /** @description The task map index that updated the dataset. */ source_map_index?: number | null; /** @description The dataset event creation time */ - created_at?: string; + timestamp?: string; }; /** * @description A collection of dataset events. diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 1a25475df1..3b65450b99 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -1543,8 +1543,8 @@ def test__get_upstream_dataset_events_with_prior(configured_app): first_timestamp = pendulum.datetime(2022, 1, 1, tz='UTC') session.add_all( [ - DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp), - DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp), + DatasetEvent(dataset_id=dataset1a.id, timestamp=first_timestamp), + DatasetEvent(dataset_id=dataset1b.id, timestamp=first_timestamp), ] ) dr1 = DagRun( @@ -1557,9 +1557,9 @@ def test__get_upstream_dataset_events_with_prior(configured_app): session.add(dr1) session.add_all( [ - DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=2000)), - DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=3000)), - DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=4000)), + DatasetEvent(dataset_id=dataset1a.id, timestamp=first_timestamp.add(microseconds=2000)), + DatasetEvent(dataset_id=dataset1b.id, timestamp=first_timestamp.add(microseconds=3000)), + DatasetEvent(dataset_id=dataset1b.id, timestamp=first_timestamp.add(microseconds=4000)), ] ) dr2 = DagRun( # this dag run should be ignored @@ -1578,15 +1578,13 @@ def test__get_upstream_dataset_events_with_prior(configured_app): ) dr3.dag = dag2 session.add(dr3) - session.add_all( - [DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=5000))] - ) + session.add_all([DatasetEvent(dataset_id=dataset1a.id, timestamp=first_timestamp.add(microseconds=5000))]) session.commit() session.expunge_all() events = _get_upstream_dataset_events(dag_run=dr3, session=session) - event_times = [x.created_at for x in events] + event_times = [x.timestamp for x in events] assert event_times == [ first_timestamp.add(microseconds=2000), first_timestamp.add(microseconds=3000), @@ -1612,7 +1610,7 @@ class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint): assert len(result) == 1 created_at = pendulum.now('UTC') # make sure whatever is returned by this func is what comes out in response. - d = DatasetEvent(dataset_id=1, created_at=created_at) + d = DatasetEvent(dataset_id=1, timestamp=created_at) d.dataset = Dataset(id=1, uri='hello', created_at=created_at, updated_at=created_at) mock_get_events.return_value = [d] response = self.client.get( @@ -1623,7 +1621,7 @@ class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint): expected_response = { 'dataset_events': [ { - 'created_at': str(created_at), + 'timestamp': str(created_at), 'dataset_id': 1, 'dataset_uri': d.dataset.uri, 'extra': None, diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py b/tests/api_connexion/endpoints/test_dataset_endpoint.py index a06142dcbb..0d025d3c6c 100644 --- a/tests/api_connexion/endpoints/test_dataset_endpoint.py +++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py @@ -269,7 +269,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint): "source_map_index": -1, } - events = [DatasetEvent(id=i, created_at=timezone.parse(self.default_time), **common) for i in [1, 2]] + events = [DatasetEvent(id=i, timestamp=timezone.parse(self.default_time), **common) for i in [1, 2]] session.add_all(events) session.commit() assert session.query(DatasetEvent).count() == 2 @@ -282,13 +282,13 @@ class TestGetDatasetEvents(TestDatasetEndpoint): "dataset_events": [ { "id": 1, - "created_at": self.default_time, + "timestamp": self.default_time, **common, "dataset_uri": d.uri, }, { "id": 2, - "created_at": self.default_time, + "timestamp": self.default_time, **common, "dataset_uri": d.uri, }, @@ -328,7 +328,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint): source_task_id=f"task{i}", source_run_id=f"run{i}", source_map_index=i, - created_at=timezone.parse(self.default_time), + timestamp=timezone.parse(self.default_time), ) for i in [1, 2, 3] ] @@ -353,7 +353,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint): "source_task_id": "task2", "source_run_id": "run2", "source_map_index": 2, - "created_at": self.default_time, + "timestamp": self.default_time, } ], "total_entries": 1, @@ -369,7 +369,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint): source_task_id="bar", source_run_id="custom", source_map_index=-1, - created_at=timezone.parse(self.default_time), + timestamp=timezone.parse(self.default_time), ) for i in [1, 2] ] @@ -425,7 +425,7 @@ class TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint): source_task_id="bar", source_run_id=f"run{i}", source_map_index=-1, - created_at=timezone.parse(self.default_time), + timestamp=timezone.parse(self.default_time), ) for i in range(1, 10) ] @@ -447,7 +447,7 @@ class TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint): source_task_id="bar", source_run_id=f"run{i}", source_map_index=-1, - created_at=timezone.parse(self.default_time), + timestamp=timezone.parse(self.default_time), ) for i in range(1, 110) ] @@ -469,7 +469,7 @@ class TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint): source_task_id="bar", source_run_id=f"run{i}", source_map_index=-1, - created_at=timezone.parse(self.default_time), + timestamp=timezone.parse(self.default_time), ) for i in range(1, 200) ] diff --git a/tests/api_connexion/schemas/test_dataset_schema.py b/tests/api_connexion/schemas/test_dataset_schema.py index f6ed25b85c..46e2732f2e 100644 --- a/tests/api_connexion/schemas/test_dataset_schema.py +++ b/tests/api_connexion/schemas/test_dataset_schema.py @@ -111,7 +111,7 @@ class TestDatasetEventSchema(TestDatasetSchemaBase): source_task_id="bar", source_run_id="custom", source_map_index=-1, - created_at=timezone.parse(self.timestamp), + timestamp=timezone.parse(self.timestamp), ) session.add(event) session.flush() @@ -125,7 +125,7 @@ class TestDatasetEventSchema(TestDatasetSchemaBase): "source_task_id": "bar", "source_run_id": "custom", "source_map_index": -1, - "created_at": self.timestamp, + "timestamp": self.timestamp, } @@ -140,7 +140,7 @@ class TestDatasetEventCollectionSchema(TestDatasetSchemaBase): "source_map_index": -1, } - events = [DatasetEvent(id=i, created_at=timezone.parse(self.timestamp), **common) for i in [1, 2]] + events = [DatasetEvent(id=i, timestamp=timezone.parse(self.timestamp), **common) for i in [1, 2]] session.add_all(events) session.flush() serialized_data = dataset_event_collection_schema.dump( @@ -148,8 +148,8 @@ class TestDatasetEventCollectionSchema(TestDatasetSchemaBase): ) assert serialized_data == { "dataset_events": [ - {"id": 1, "created_at": self.timestamp, **common}, - {"id": 2, "created_at": self.timestamp, **common}, + {"id": 1, "timestamp": self.timestamp, **common}, + {"id": 2, "timestamp": self.timestamp, **common}, ], "total_entries": 2, }