This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 285c23a2f9 Use simple Json in Dataset & DatasetEvent extra field (#25321) 285c23a2f9 is described below commit 285c23a2f90f4c765053aedbd3f92c9f58a84d28 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Wed Jul 27 06:56:48 2022 +0100 Use simple Json in Dataset & DatasetEvent extra field (#25321) Replaces ExtendedJson with sqlalchemy_jsonfield type --- airflow/api_connexion/schemas/dataset_schema.py | 5 +++-- airflow/migrations/versions/0114_2_4_0_add_dataset_model.py | 8 ++++---- airflow/models/dataset.py | 8 +++++--- tests/api_connexion/endpoints/test_dag_run_endpoint.py | 2 +- tests/api_connexion/endpoints/test_dataset_endpoint.py | 2 +- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/airflow/api_connexion/schemas/dataset_schema.py b/airflow/api_connexion/schemas/dataset_schema.py index 06c1dc866d..a2bc56f89d 100644 --- a/airflow/api_connexion/schemas/dataset_schema.py +++ b/airflow/api_connexion/schemas/dataset_schema.py @@ -20,6 +20,7 @@ from typing import List, NamedTuple from marshmallow import Schema, fields from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field +from airflow.api_connexion.schemas.common_schema import JsonObjectField from airflow.models.dataset import Dataset, DatasetEvent @@ -33,7 +34,7 @@ class DatasetSchema(SQLAlchemySchema): id = auto_field() uri = auto_field() - extra = fields.Dict() + extra = JsonObjectField() created_at = auto_field() updated_at = auto_field() @@ -67,7 +68,7 @@ class DatasetEventSchema(SQLAlchemySchema): id = auto_field() dataset_id = auto_field() dataset_uri = fields.String(attribute='dataset.uri', dump_only=True) - extra = fields.Dict() + extra = JsonObjectField() source_task_id = auto_field() source_dag_id = auto_field() source_run_id = auto_field() diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py index 8cc5d9dc2e..1903cdadf3 100644 --- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -23,13 +23,13 @@ Revises: 44b7034f6bdc Create Date: 2022-06-22 14:37:20.880672 """ - import sqlalchemy as sa +import sqlalchemy_jsonfield from alembic import op from sqlalchemy import Integer, String, func from airflow.migrations.db_types import TIMESTAMP, StringID -from airflow.utils.sqlalchemy import ExtendedJSON +from airflow.settings import json revision = '0038cd0c28b4' down_revision = '44b7034f6bdc' @@ -55,7 +55,7 @@ def _create_dataset_table(): ), nullable=False, ), - sa.Column('extra', ExtendedJSON, nullable=True), + sa.Column('extra', sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}), sa.Column('created_at', TIMESTAMP, nullable=False), sa.Column('updated_at', TIMESTAMP, nullable=False), sqlite_autoincrement=True, # ensures PK values not reused @@ -123,7 +123,7 @@ def _create_dataset_event_table(): 'dataset_event', sa.Column('id', Integer, primary_key=True, autoincrement=True), sa.Column('dataset_id', Integer, nullable=False), - sa.Column('extra', ExtendedJSON, nullable=True), + sa.Column('extra', sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}), sa.Column('source_task_id', String(250), nullable=True), sa.Column('source_dag_id', String(250), nullable=True), sa.Column('source_run_id', String(250), nullable=True), diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 21373106b5..94b28b6bb0 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -17,12 +17,14 @@ # under the License. from urllib.parse import urlparse +import sqlalchemy_jsonfield from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, PrimaryKeyConstraint, String, text from sqlalchemy.orm import relationship from airflow.models.base import ID_LEN, Base, StringID +from airflow.settings import json from airflow.utils import timezone -from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime +from airflow.utils.sqlalchemy import UtcDateTime class Dataset(Base): @@ -46,7 +48,7 @@ class Dataset(Base): ), nullable=False, ) - extra = Column(ExtendedJSON, nullable=True) + extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) @@ -219,7 +221,7 @@ class DatasetEvent(Base): id = Column(Integer, primary_key=True, autoincrement=True) dataset_id = Column(Integer, nullable=False) - extra = Column(ExtendedJSON, nullable=True) + extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) source_task_id = Column(StringID(), nullable=True) source_dag_id = Column(StringID(), nullable=True) source_run_id = Column(StringID(), nullable=True) diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 3b65450b99..969fd5748d 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -1624,7 +1624,7 @@ class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint): 'timestamp': str(created_at), 'dataset_id': 1, 'dataset_uri': d.dataset.uri, - 'extra': None, + 'extra': {}, 'id': None, 'source_dag_id': None, 'source_map_index': None, diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py b/tests/api_connexion/endpoints/test_dataset_endpoint.py index 0d025d3c6c..76cb18dd98 100644 --- a/tests/api_connexion/endpoints/test_dataset_endpoint.py +++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py @@ -348,7 +348,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint): "id": 2, "dataset_id": 2, "dataset_uri": datasets[1].uri, - "extra": None, + "extra": {}, "source_dag_id": "dag2", "source_task_id": "task2", "source_run_id": "run2",