This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 32f3fcc28ad Do not backfill old DagRun.created_at (#63825)
32f3fcc28ad is described below
commit 32f3fcc28ad1d01e37c2d960e82c610ce838dcb9
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Mar 18 13:32:12 2026 +0800
Do not backfill old DagRun.created_at (#63825)
Do not backfill old DagRun.created_at
---
.../0106_3_2_0_add_partition_key_to_backfill_dag_run.py | 17 +++++------------
airflow-core/src/airflow/models/dagrun.py | 5 ++++-
2 files changed, 9 insertions(+), 13 deletions(-)
diff --git
a/airflow-core/src/airflow/migrations/versions/0106_3_2_0_add_partition_key_to_backfill_dag_run.py
b/airflow-core/src/airflow/migrations/versions/0106_3_2_0_add_partition_key_to_backfill_dag_run.py
index 422e9b8f70d..ca4c1b0996d 100644
---
a/airflow-core/src/airflow/migrations/versions/0106_3_2_0_add_partition_key_to_backfill_dag_run.py
+++
b/airflow-core/src/airflow/migrations/versions/0106_3_2_0_add_partition_key_to_backfill_dag_run.py
@@ -44,15 +44,9 @@ airflow_version = "3.2.0"
def upgrade():
"""Apply Add partition_key to backfill_dag_run."""
op.add_column("dag_run", sa.Column("created_at",
UtcDateTime(timezone=True), nullable=True))
- op.execute("update dag_run set created_at = run_after;")
-
- with disable_sqlite_fkeys(op):
- with op.batch_alter_table("dag_run", schema=None) as batch_op:
- batch_op.alter_column("created_at",
existing_type=UtcDateTime(timezone=True), nullable=False)
-
- with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
- batch_op.add_column(sa.Column("partition_key", StringID(),
nullable=True))
- batch_op.alter_column("logical_date",
existing_type=sa.TIMESTAMP(), nullable=True)
+ with disable_sqlite_fkeys(op), op.batch_alter_table("backfill_dag_run",
schema=None) as batch_op:
+ batch_op.add_column(sa.Column("partition_key", StringID(),
nullable=True))
+ batch_op.alter_column("logical_date", existing_type=sa.TIMESTAMP(),
nullable=True)
def downgrade():
@@ -62,6 +56,5 @@ def downgrade():
with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
batch_op.alter_column("logical_date",
existing_type=sa.TIMESTAMP(), nullable=False)
batch_op.drop_column("partition_key")
-
- with op.batch_alter_table("dag_run", schema=None) as batch_op:
- batch_op.drop_column("created_at")
+ with op.batch_alter_table("dag_run", schema=None) as batch_op:
+ batch_op.drop_column("created_at")
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 9923781fbe5..23fdfb72564 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -191,7 +191,10 @@ class DagRun(Base, LoggingMixin):
ForeignKey("log_template.id",
name="task_instance_log_template_id_fkey", ondelete="NO ACTION"),
default=select(func.max(LogTemplate.__table__.c.id)),
)
- created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow)
+ # This is nullable because it's too costly to migrate dagruns created prior
+ # to this column's addition (Airflow 3.2.0). If you want a reasonable
+ # meaningful non-null value, use ``dr.created_at or dr.run_after``.
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=True,
default=timezone.utcnow)
updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow
)