Dev-iL commented on code in PR #55954:
URL: https://github.com/apache/airflow/pull/55954#discussion_r2382807429
##########
airflow-core/src/airflow/models/connection.py:
##########
@@ -126,19 +137,21 @@ class Connection(Base, LoggingMixin):
__tablename__ = "connection"
- id = Column(Integer(), primary_key=True)
- conn_id = Column(String(ID_LEN), unique=True, nullable=False)
- conn_type = Column(String(500), nullable=False)
- description = Column(Text().with_variant(Text(5000),
"mysql").with_variant(String(5000), "sqlite"))
- host = Column(String(500))
- schema = Column(String(500))
- login = Column(Text())
- _password = Column("password", Text())
- port = Column(Integer())
- is_encrypted = Column(Boolean, unique=False, default=False)
- is_extra_encrypted = Column(Boolean, unique=False, default=False)
- team_id = Column(UUIDType(binary=False), ForeignKey("team.id"),
nullable=True)
- _extra = Column("extra", Text())
+ id: Mapped[int] = mapped_column(Integer(), primary_key=True)
+ conn_id: Mapped[str] = mapped_column(String(ID_LEN), unique=True,
nullable=False)
Review Comment:
Why is it not the below?
```suggestion
conn_id: Mapped[str] = mapped_column(StringID(), unique=True,
nullable=False)
```
##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -117,23 +127,27 @@ class Backfill(Base):
__tablename__ = "backfill"
- id = Column(Integer, primary_key=True, autoincrement=True)
- dag_id = Column(StringID(), nullable=False)
- from_date = Column(UtcDateTime, nullable=False)
- to_date = Column(UtcDateTime, nullable=False)
- dag_run_conf = Column(JSONField(json=json), nullable=False, default={})
- is_paused = Column(Boolean, default=False)
+ id: Mapped[int] = mapped_column(Integer, primary_key=True,
autoincrement=True)
+ dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
+ from_date: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
+ to_date: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
+ dag_run_conf = mapped_column(JSONField(json=json), nullable=False,
default={})
+ is_paused: Mapped[bool] = mapped_column(Boolean, default=False)
"""
Controls whether new dag runs will be created for this backfill.
Does not pause existing dag runs.
"""
- reprocess_behavior = Column(StringID(), nullable=False,
default=ReprocessBehavior.NONE)
- max_active_runs = Column(Integer, default=10, nullable=False)
- created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
- completed_at = Column(UtcDateTime, nullable=True)
- updated_at = Column(UtcDateTime, default=timezone.utcnow,
onupdate=timezone.utcnow, nullable=False)
- triggering_user_name = Column(
+ reprocess_behavior: Mapped[str] = mapped_column(
+ StringID(), nullable=False, default=ReprocessBehavior.NONE
+ )
+ max_active_runs: Mapped[int] = mapped_column(Integer, default=10,
nullable=False)
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ completed_at: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
Review Comment:
The `nullable` kwarg is unnecessary if `None` is part of the type hint. I'm
not sure it can be removed while we are still supporting 1.4, but should
definitely be an action item when moving to 2.0 exclusively.
```python
# Testing code for 2.0:
from sqlalchemy import String, inspect
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user"
id: Mapped[int] = mapped_column(primary_key=True)
nickname: Mapped[str | None]
class User2(Base):
__tablename__ = "user2"
id: Mapped[int] = mapped_column(primary_key=True)
nickname: Mapped[str | None] = mapped_column(String(30), nullable=True)
class User3(Base):
__tablename__ = "user3"
id: Mapped[int] = mapped_column(primary_key=True)
nickname: Mapped[str]
print(inspect(User).columns.nickname.nullable)
# True
print(inspect(User2).columns.nickname.nullable)
# True
print(inspect(User3).columns.nickname.nullable)
# False
```
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -2202,15 +2201,15 @@ class TaskInstanceNote(Base):
"""For storage of arbitrary notes concerning the task instance."""
__tablename__ = "task_instance_note"
- ti_id = Column(
+ ti_id = mapped_column(
String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
primary_key=True,
nullable=False,
)
- user_id = Column(String(128), nullable=True)
- content = Column(String(1000).with_variant(Text(1000), "mysql"))
- created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
- updated_at = Column(UtcDateTime, default=timezone.utcnow,
onupdate=timezone.utcnow, nullable=False)
+ user_id = mapped_column(String(128), nullable=True)
+ content = mapped_column(String(1000).with_variant(Text(1000), "mysql"))
+ created_at = mapped_column(UtcDateTime, default=timezone.utcnow,
nullable=False)
+ updated_at = mapped_column(UtcDateTime, default=timezone.utcnow,
onupdate=timezone.utcnow, nullable=False)
Review Comment:
Missing `Mapped[...]`
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -148,57 +147,61 @@ class DagRun(Base, LoggingMixin):
__tablename__ = "dag_run"
- id = Column(Integer, primary_key=True)
- dag_id = Column(StringID(), nullable=False)
- queued_at = Column(UtcDateTime)
- logical_date = Column(UtcDateTime, nullable=True)
- start_date = Column(UtcDateTime)
- end_date = Column(UtcDateTime)
- _state = Column("state", String(50), default=DagRunState.QUEUED)
- run_id = Column(StringID(), nullable=False)
- creating_job_id = Column(Integer)
- run_type = Column(String(50), nullable=False)
- triggered_by = Column(
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
+ queued_at: Mapped[datetime | None] = mapped_column(UtcDateTime)
+ logical_date: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
+ start_date: Mapped[datetime | None] = mapped_column(UtcDateTime)
+ end_date: Mapped[datetime | None] = mapped_column(UtcDateTime)
Review Comment:
Note that every date, and not just `logical_date` is now nullable, judging
by the type hint. Was that intentional?
##########
airflow-core/src/airflow/models/taskreschedule.py:
##########
@@ -49,16 +48,16 @@ class TaskReschedule(Base):
"""TaskReschedule tracks rescheduled task instances."""
__tablename__ = "task_reschedule"
- id = Column(Integer, primary_key=True)
- ti_id = Column(
+ id = mapped_column(Integer, primary_key=True)
+ ti_id = mapped_column(
String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
ForeignKey("task_instance.id", ondelete="CASCADE",
name="task_reschedule_ti_fkey"),
nullable=False,
)
- start_date = Column(UtcDateTime, nullable=False)
- end_date = Column(UtcDateTime, nullable=False)
- duration = Column(Integer, nullable=False)
- reschedule_date = Column(UtcDateTime, nullable=False)
+ start_date = mapped_column(UtcDateTime, nullable=False)
+ end_date = mapped_column(UtcDateTime, nullable=False)
+ duration = mapped_column(Integer, nullable=False)
+ reschedule_date = mapped_column(UtcDateTime, nullable=False)
Review Comment:
`Mapped[...]`
##########
airflow-core/src/airflow/models/taskmap.py:
##########
@@ -63,13 +63,13 @@ class TaskMap(TaskInstanceDependencies):
__tablename__ = "task_map"
# Link to upstream TaskInstance creating this dynamic mapping information.
- dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
- task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
- run_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
- map_index = Column(Integer, primary_key=True)
+ dag_id = mapped_column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+ task_id = mapped_column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+ run_id = mapped_column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+ map_index = mapped_column(Integer, primary_key=True)
- length = Column(Integer, nullable=False)
- keys = Column(ExtendedJSON, nullable=True)
+ length = mapped_column(Integer, nullable=False)
+ keys = mapped_column(ExtendedJSON, nullable=True)
Review Comment:
`Mapped[...]`
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -148,57 +147,61 @@ class DagRun(Base, LoggingMixin):
__tablename__ = "dag_run"
- id = Column(Integer, primary_key=True)
- dag_id = Column(StringID(), nullable=False)
- queued_at = Column(UtcDateTime)
- logical_date = Column(UtcDateTime, nullable=True)
- start_date = Column(UtcDateTime)
- end_date = Column(UtcDateTime)
- _state = Column("state", String(50), default=DagRunState.QUEUED)
- run_id = Column(StringID(), nullable=False)
- creating_job_id = Column(Integer)
- run_type = Column(String(50), nullable=False)
- triggered_by = Column(
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
+ queued_at: Mapped[datetime | None] = mapped_column(UtcDateTime)
+ logical_date: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
+ start_date: Mapped[datetime | None] = mapped_column(UtcDateTime)
+ end_date: Mapped[datetime | None] = mapped_column(UtcDateTime)
+ _state: Mapped[str] = mapped_column("state", String(50),
default=DagRunState.QUEUED)
+ run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
+ creating_job_id: Mapped[int | None] = mapped_column(Integer)
Review Comment:
The field is now nullable. Intentional?
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -148,57 +147,61 @@ class DagRun(Base, LoggingMixin):
__tablename__ = "dag_run"
- id = Column(Integer, primary_key=True)
- dag_id = Column(StringID(), nullable=False)
- queued_at = Column(UtcDateTime)
- logical_date = Column(UtcDateTime, nullable=True)
- start_date = Column(UtcDateTime)
- end_date = Column(UtcDateTime)
- _state = Column("state", String(50), default=DagRunState.QUEUED)
- run_id = Column(StringID(), nullable=False)
- creating_job_id = Column(Integer)
- run_type = Column(String(50), nullable=False)
- triggered_by = Column(
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
+ queued_at: Mapped[datetime | None] = mapped_column(UtcDateTime)
+ logical_date: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
+ start_date: Mapped[datetime | None] = mapped_column(UtcDateTime)
+ end_date: Mapped[datetime | None] = mapped_column(UtcDateTime)
+ _state: Mapped[str] = mapped_column("state", String(50),
default=DagRunState.QUEUED)
+ run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
+ creating_job_id: Mapped[int | None] = mapped_column(Integer)
+ run_type: Mapped[str] = mapped_column(String(50), nullable=False)
+ triggered_by: Mapped[DagRunTriggeredByType | None] = mapped_column(
Review Comment:
Nullability changed
##########
airflow-core/src/airflow/models/renderedtifields.py:
##########
@@ -69,12 +69,12 @@ class RenderedTaskInstanceFields(TaskInstanceDependencies):
__tablename__ = "rendered_task_instance_fields"
- dag_id = Column(StringID(), primary_key=True)
- task_id = Column(StringID(), primary_key=True)
- run_id = Column(StringID(), primary_key=True)
- map_index = Column(Integer, primary_key=True, server_default=text("-1"))
- rendered_fields = Column(sqlalchemy_jsonfield.JSONField(json=json),
nullable=False)
- k8s_pod_yaml = Column(sqlalchemy_jsonfield.JSONField(json=json),
nullable=True)
+ dag_id = mapped_column(StringID(), primary_key=True)
+ task_id = mapped_column(StringID(), primary_key=True)
+ run_id = mapped_column(StringID(), primary_key=True)
+ map_index = mapped_column(Integer, primary_key=True,
server_default=text("-1"))
+ rendered_fields = mapped_column(sqlalchemy_jsonfield.JSONField(json=json),
nullable=False)
+ k8s_pod_yaml = mapped_column(sqlalchemy_jsonfield.JSONField(json=json),
nullable=True)
Review Comment:
Why no `Mapped[...]`?
##########
airflow-core/src/airflow/models/taskinstancehistory.py:
##########
@@ -64,48 +64,48 @@ class TaskInstanceHistory(Base):
"""
__tablename__ = "task_instance_history"
- task_instance_id = Column(
+ task_instance_id = mapped_column(
String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
nullable=False,
primary_key=True,
)
- task_id = Column(StringID(), nullable=False)
- dag_id = Column(StringID(), nullable=False)
- run_id = Column(StringID(), nullable=False)
- map_index = Column(Integer, nullable=False, server_default=text("-1"))
- try_number = Column(Integer, nullable=False)
- start_date = Column(UtcDateTime)
- end_date = Column(UtcDateTime)
- duration = Column(Float)
- state = Column(String(20))
- max_tries = Column(Integer, server_default=text("-1"))
- hostname = Column(String(1000))
- unixname = Column(String(1000))
- pool = Column(String(256), nullable=False)
- pool_slots = Column(Integer, default=1, nullable=False)
- queue = Column(String(256))
- priority_weight = Column(Integer)
- operator = Column(String(1000))
- custom_operator_name = Column(String(1000))
- queued_dttm = Column(UtcDateTime)
- scheduled_dttm = Column(UtcDateTime)
- queued_by_job_id = Column(Integer)
- pid = Column(Integer)
- executor = Column(String(1000))
- executor_config = Column(ExecutorConfigType(pickler=dill))
- updated_at = Column(UtcDateTime, default=timezone.utcnow,
onupdate=timezone.utcnow)
- rendered_map_index = Column(String(250))
- context_carrier = Column(MutableDict.as_mutable(ExtendedJSON))
- span_status = Column(String(250), server_default=SpanStatus.NOT_STARTED,
nullable=False)
-
- external_executor_id = Column(StringID())
- trigger_id = Column(Integer)
- trigger_timeout = Column(DateTime)
- next_method = Column(String(1000))
- next_kwargs = Column(MutableDict.as_mutable(ExtendedJSON))
-
- task_display_name = Column(String(2000), nullable=True)
- dag_version_id = Column(UUIDType(binary=False))
+ task_id = mapped_column(StringID(), nullable=False)
+ dag_id = mapped_column(StringID(), nullable=False)
+ run_id = mapped_column(StringID(), nullable=False)
+ map_index = mapped_column(Integer, nullable=False,
server_default=text("-1"))
+ try_number = mapped_column(Integer, nullable=False)
+ start_date = mapped_column(UtcDateTime)
+ end_date = mapped_column(UtcDateTime)
+ duration = mapped_column(Float)
+ state = mapped_column(String(20))
+ max_tries = mapped_column(Integer, server_default=text("-1"))
+ hostname = mapped_column(String(1000))
+ unixname = mapped_column(String(1000))
+ pool = mapped_column(String(256), nullable=False)
+ pool_slots = mapped_column(Integer, default=1, nullable=False)
+ queue = mapped_column(String(256))
+ priority_weight = mapped_column(Integer)
+ operator = mapped_column(String(1000))
+ custom_operator_name = mapped_column(String(1000))
+ queued_dttm = mapped_column(UtcDateTime)
+ scheduled_dttm = mapped_column(UtcDateTime)
+ queued_by_job_id = mapped_column(Integer)
+ pid = mapped_column(Integer)
+ executor = mapped_column(String(1000))
+ executor_config = mapped_column(ExecutorConfigType(pickler=dill))
+ updated_at = mapped_column(UtcDateTime, default=timezone.utcnow,
onupdate=timezone.utcnow)
+ rendered_map_index = mapped_column(String(250))
+ context_carrier = mapped_column(MutableDict.as_mutable(ExtendedJSON))
+ span_status = mapped_column(String(250),
server_default=SpanStatus.NOT_STARTED, nullable=False)
+
+ external_executor_id = mapped_column(StringID())
+ trigger_id = mapped_column(Integer)
+ trigger_timeout = mapped_column(DateTime)
+ next_method = mapped_column(String(1000))
+ next_kwargs = mapped_column(MutableDict.as_mutable(ExtendedJSON))
+
+ task_display_name = mapped_column(String(2000), nullable=True)
+ dag_version_id = mapped_column(UUIDType(binary=False))
Review Comment:
`Mapped[...]`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]