This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 488de3c2050 Partially revert #50825 on database level (#54366)
488de3c2050 is described below
commit 488de3c2050f44d941f9ddb040c1f6b6a43ba0da
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Aug 12 11:04:07 2025 +0100
Partially revert #50825 on database level (#54366)
* Partially revert #50825 on database level
This is alternative to #53820. Here we make the TI.dag_version_id
nullable on the database level. it's still enforced in code
* fixup! Partially revert #50825 on database level
---
airflow-core/docs/img/airflow_erd.sha256 | 2 +-
airflow-core/docs/img/airflow_erd.svg | 3 +-
airflow-core/docs/migrations-ref.rst | 4 +-
.../core_api/datamodels/task_instances.py | 2 +-
.../core_api/openapi/v2-rest-api-generated.yaml | 8 ++-
...> 0076_3_1_0_add_human_in_the_loop_response.py} | 4 +-
...6_3_1_0_make_dag_version_id_non_nullable_in_.py | 81 ----------------------
...py => 0077_3_1_0_add_trigger_id_to_deadline.py} | 0
...> 0078_3_1_0_add_callback_state_to_deadline.py} | 0
..._url_and_template_params_to_dagbundle_model.py} | 0
... 0080_3_1_0_modify_deadline_callback_schema.py} | 0
airflow-core/src/airflow/models/taskinstance.py | 3 +-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 15 ++--
.../airflow/ui/openapi-gen/requests/types.gen.ts | 2 +-
.../Clear/TaskInstance/ClearTaskInstanceDialog.tsx | 2 +-
.../src/airflow/ui/src/hooks/useSelectedVersion.ts | 2 +-
.../src/airflowctl/api/datamodels/generated.py | 2 +-
17 files changed, 28 insertions(+), 102 deletions(-)
diff --git a/airflow-core/docs/img/airflow_erd.sha256
b/airflow-core/docs/img/airflow_erd.sha256
index 8bfb32a3713..e6031ef83b2 100644
--- a/airflow-core/docs/img/airflow_erd.sha256
+++ b/airflow-core/docs/img/airflow_erd.sha256
@@ -1 +1 @@
-625f362919679fe85b2bfb3f9e053261248ac6ec7a974eee51012e55a8105b94
\ No newline at end of file
+b31700355c6c8e073896b9daba68b09c252bf1a05f535ef40aa0cb927d12e9ce
\ No newline at end of file
diff --git a/airflow-core/docs/img/airflow_erd.svg
b/airflow-core/docs/img/airflow_erd.svg
index e6776c5f153..c9e62976b5e 100644
--- a/airflow-core/docs/img/airflow_erd.svg
+++ b/airflow-core/docs/img/airflow_erd.svg
@@ -776,7 +776,6 @@
<text text-anchor="start" x="1770" y="-1943.3"
font-family="Helvetica,sans-Serif" font-size="14.00">dag_version_id</text>
<text text-anchor="start" x="1874" y="-1943.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
<text text-anchor="start" x="1879" y="-1943.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
-<text text-anchor="start" x="1931" y="-1943.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
<polygon fill="none" stroke="black" points="1765,-1908.5 1765,-1933.5
2073,-1933.5 2073,-1908.5 1765,-1908.5"/>
<text text-anchor="start" x="1770" y="-1918.3"
font-family="Helvetica,sans-Serif" font-size="14.00">duration</text>
<text text-anchor="start" x="1829" y="-1918.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
@@ -1894,7 +1893,7 @@
<title>dag_version--task_instance</title>
<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1232.29,-1656.53C1266.17,-1642.21 1302.65,-1629.31 1338,-1621.5
1477.03,-1590.78 1639.39,-1590.63 1756.91,-1596.66"/>
<text text-anchor="start" x="1725.91" y="-1585.46" font-family="Times,serif"
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1232.29" y="-1645.33" font-family="Times,serif"
font-size="14.00">1</text>
+<text text-anchor="start" x="1232.29" y="-1645.33" font-family="Times,serif"
font-size="14.00">{0,1}</text>
</g>
<!-- dag_run -->
<g id="node34" class="node">
diff --git a/airflow-core/docs/migrations-ref.rst
b/airflow-core/docs/migrations-ref.rst
index a27474fdead..cf9363c316f 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -47,9 +47,7 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``09fa89ba1710`` | ``40f7c30a228b`` | ``3.1.0`` | Add
trigger_id to deadline. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
-| ``40f7c30a228b`` | ``5d3072c51bac`` | ``3.1.0`` | Add Human
In the Loop Detail table. |
-+-------------------------+------------------+-------------------+--------------------------------------------------------------+
-| ``5d3072c51bac`` | ``ffdb0566c7c0`` | ``3.1.0`` | Make
dag_version_id non-nullable in TaskInstance. |
+| ``40f7c30a228b`` | ``ffdb0566c7c0`` | ``3.1.0`` | Add Human
In the Loop Detail table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``ffdb0566c7c0`` | ``66a7743fe20e`` | ``3.1.0`` | Add
dag_favorite table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index 3611df275e7..fb87fb66c0a 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -44,7 +44,6 @@ class TaskInstanceResponse(BaseModel):
id: str
task_id: str
dag_id: str
- dag_version: DagVersionResponse
run_id: str = Field(alias="dag_run_id")
map_index: int
logical_date: datetime | None
@@ -77,6 +76,7 @@ class TaskInstanceResponse(BaseModel):
)
trigger: TriggerResponse | None
queued_by_job: JobResponse | None = Field(alias="triggerer_job")
+ dag_version: DagVersionResponse | None
class TaskInstanceCollectionResponse(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 219da4bd06f..f42e1846bdf 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -11202,8 +11202,6 @@ components:
dag_id:
type: string
title: Dag Id
- dag_version:
- $ref: '#/components/schemas/DagVersionResponse'
dag_run_id:
type: string
title: Dag Run Id
@@ -11331,12 +11329,15 @@ components:
anyOf:
- $ref: '#/components/schemas/JobResponse'
- type: 'null'
+ dag_version:
+ anyOf:
+ - $ref: '#/components/schemas/DagVersionResponse'
+ - type: 'null'
type: object
required:
- id
- task_id
- dag_id
- - dag_version
- dag_run_id
- map_index
- logical_date
@@ -11365,6 +11366,7 @@ components:
- rendered_map_index
- trigger
- triggerer_job
+ - dag_version
title: TaskInstanceResponse
description: TaskInstance serializer for responses.
TaskInstanceState:
diff --git
a/airflow-core/src/airflow/migrations/versions/0077_3_1_0_add_human_in_the_loop_response.py
b/airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py
similarity index 98%
rename from
airflow-core/src/airflow/migrations/versions/0077_3_1_0_add_human_in_the_loop_response.py
rename to
airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py
index 61f950f5d12..4b1c5a36e89 100644
---
a/airflow-core/src/airflow/migrations/versions/0077_3_1_0_add_human_in_the_loop_response.py
+++
b/airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py
@@ -20,7 +20,7 @@
Add Human In the Loop Detail table.
Revision ID: 40f7c30a228b
-Revises: 5d3072c51bac
+Revises: ffdb0566c7c0
Create Date: 2025-07-04 15:05:19.459197
"""
@@ -37,7 +37,7 @@ from airflow.utils.sqlalchemy import UtcDateTime
# revision identifiers, used by Alembic.
revision = "40f7c30a228b"
-down_revision = "5d3072c51bac"
+down_revision = "ffdb0566c7c0"
branch_labels = None
depends_on = None
airflow_version = "3.1.0"
diff --git
a/airflow-core/src/airflow/migrations/versions/0076_3_1_0_make_dag_version_id_non_nullable_in_.py
b/airflow-core/src/airflow/migrations/versions/0076_3_1_0_make_dag_version_id_non_nullable_in_.py
deleted file mode 100644
index cbd183f7b8f..00000000000
---
a/airflow-core/src/airflow/migrations/versions/0076_3_1_0_make_dag_version_id_non_nullable_in_.py
+++ /dev/null
@@ -1,81 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Make dag_version_id non-nullable in TaskInstance.
-
-Revision ID: 5d3072c51bac
-Revises: ffdb0566c7c0
-Create Date: 2025-05-20 10:38:25.635779
-
-"""
-
-from __future__ import annotations
-
-import sqlalchemy as sa
-from alembic import op
-from sqlalchemy_utils import UUIDType
-
-# revision identifiers, used by Alembic.
-revision = "5d3072c51bac"
-down_revision = "ffdb0566c7c0"
-branch_labels = None
-depends_on = None
-airflow_version = "3.1.0"
-
-
-def upgrade():
- """Apply make dag_version_id non-nullable in TaskInstance."""
- conn = op.get_bind()
- if conn.dialect.name == "postgresql":
- update_query = sa.text("""
- UPDATE task_instance
- SET dag_version_id = latest_versions.id
- FROM (
- SELECT DISTINCT ON (dag_id) dag_id, id
- FROM dag_version
- ORDER BY dag_id, created_at DESC
- ) latest_versions
- WHERE task_instance.dag_id = latest_versions.dag_id
- AND task_instance.dag_version_id IS NULL
- """)
- else:
- update_query = sa.text("""
- UPDATE task_instance
- SET dag_version_id = (
- SELECT id FROM (
- SELECT id, dag_id,
- ROW_NUMBER() OVER (PARTITION BY dag_id ORDER BY created_at
DESC) as rn
- FROM dag_version
- ) ranked_versions
- WHERE ranked_versions.dag_id = task_instance.dag_id
- AND ranked_versions.rn = 1
- )
- WHERE task_instance.dag_version_id IS NULL
- """)
-
- op.execute(update_query)
-
- with op.batch_alter_table("task_instance", schema=None) as batch_op:
- batch_op.alter_column("dag_version_id",
existing_type=UUIDType(binary=False), nullable=False)
-
-
-def downgrade():
- """Unapply make dag_version_id non-nullable in TaskInstance."""
- with op.batch_alter_table("task_instance", schema=None) as batch_op:
- batch_op.alter_column("dag_version_id",
existing_type=UUIDType(binary=False), nullable=True)
diff --git
a/airflow-core/src/airflow/migrations/versions/0078_3_1_0_add_trigger_id_to_deadline.py
b/airflow-core/src/airflow/migrations/versions/0077_3_1_0_add_trigger_id_to_deadline.py
similarity index 100%
rename from
airflow-core/src/airflow/migrations/versions/0078_3_1_0_add_trigger_id_to_deadline.py
rename to
airflow-core/src/airflow/migrations/versions/0077_3_1_0_add_trigger_id_to_deadline.py
diff --git
a/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_callback_state_to_deadline.py
b/airflow-core/src/airflow/migrations/versions/0078_3_1_0_add_callback_state_to_deadline.py
similarity index 100%
rename from
airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_callback_state_to_deadline.py
rename to
airflow-core/src/airflow/migrations/versions/0078_3_1_0_add_callback_state_to_deadline.py
diff --git
a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_add_url_and_template_params_to_dagbundle_model.py
b/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_url_and_template_params_to_dagbundle_model.py
similarity index 100%
rename from
airflow-core/src/airflow/migrations/versions/0080_3_1_0_add_url_and_template_params_to_dagbundle_model.py
rename to
airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_url_and_template_params_to_dagbundle_model.py
diff --git
a/airflow-core/src/airflow/migrations/versions/0081_3_1_0_modify_deadline_callback_schema.py
b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
similarity index 100%
rename from
airflow-core/src/airflow/migrations/versions/0081_3_1_0_modify_deadline_callback_schema.py
rename to
airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 6dcb7f9b08d..3f4dc544550 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -527,7 +527,8 @@ class TaskInstance(Base, LoggingMixin):
_task_display_property_value = Column("task_display_name", String(2000),
nullable=True)
dag_version_id = Column(
- UUIDType(binary=False), ForeignKey("dag_version.id",
ondelete="RESTRICT"), nullable=False
+ UUIDType(binary=False),
+ ForeignKey("dag_version.id", ondelete="RESTRICT"),
)
dag_version = relationship("DagVersion", back_populates="task_instances")
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 58013b0e48d..abfcc81b079 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -4752,9 +4752,6 @@ export const $TaskInstanceResponse = {
type: 'string',
title: 'Dag Id'
},
- dag_version: {
- '$ref': '#/components/schemas/DagVersionResponse'
- },
dag_run_id: {
type: 'string',
title: 'Dag Run Id'
@@ -5000,10 +4997,20 @@ export const $TaskInstanceResponse = {
type: 'null'
}
]
+ },
+ dag_version: {
+ anyOf: [
+ {
+ '$ref': '#/components/schemas/DagVersionResponse'
+ },
+ {
+ type: 'null'
+ }
+ ]
}
},
type: 'object',
- required: ['id', 'task_id', 'dag_id', 'dag_version', 'dag_run_id',
'map_index', 'logical_date', 'run_after', 'start_date', 'end_date', 'duration',
'state', 'try_number', 'max_tries', 'task_display_name', 'dag_display_name',
'hostname', 'unixname', 'pool', 'pool_slots', 'queue', 'priority_weight',
'operator', 'queued_when', 'scheduled_when', 'pid', 'executor',
'executor_config', 'note', 'rendered_map_index', 'trigger', 'triggerer_job'],
+ required: ['id', 'task_id', 'dag_id', 'dag_run_id', 'map_index',
'logical_date', 'run_after', 'start_date', 'end_date', 'duration', 'state',
'try_number', 'max_tries', 'task_display_name', 'dag_display_name', 'hostname',
'unixname', 'pool', 'pool_slots', 'queue', 'priority_weight', 'operator',
'queued_when', 'scheduled_when', 'pid', 'executor', 'executor_config', 'note',
'rendered_map_index', 'trigger', 'triggerer_job', 'dag_version'],
title: 'TaskInstanceResponse',
description: 'TaskInstance serializer for responses.'
} as const;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 31107d46e5e..b0f1a6e5d27 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1299,7 +1299,6 @@ export type TaskInstanceResponse = {
id: string;
task_id: string;
dag_id: string;
- dag_version: DagVersionResponse;
dag_run_id: string;
map_index: number;
logical_date: string | null;
@@ -1331,6 +1330,7 @@ export type TaskInstanceResponse = {
};
trigger: TriggerResponse | null;
triggerer_job: JobResponse | null;
+ dag_version: DagVersionResponse | null;
};
/**
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
index 1f4e1886ea7..62c0426790c 100644
---
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
+++
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
@@ -98,7 +98,7 @@ const ClearTaskInstanceDialog = ({ onClose, open,
taskInstance }: Props) => {
// Check if bundle versions are different
const currentDagBundleVersion = dagDetails?.bundle_version;
- const taskInstanceDagVersionBundleVersion =
taskInstance.dag_version.bundle_version;
+ const taskInstanceDagVersionBundleVersion =
taskInstance.dag_version?.bundle_version;
const bundleVersionsDiffer = currentDagBundleVersion !==
taskInstanceDagVersionBundleVersion;
const shouldShowBundleVersionOption =
bundleVersionsDiffer &&
diff --git a/airflow-core/src/airflow/ui/src/hooks/useSelectedVersion.ts
b/airflow-core/src/airflow/ui/src/hooks/useSelectedVersion.ts
index 2ed274ef4b5..fb33fca1719 100644
--- a/airflow-core/src/airflow/ui/src/hooks/useSelectedVersion.ts
+++ b/airflow-core/src/airflow/ui/src/hooks/useSelectedVersion.ts
@@ -82,7 +82,7 @@ const useSelectedVersion = (): number | undefined => {
const selectedVersionNumber =
selectedVersionUrl ??
- (mappedTaskInstanceData ?
mappedTaskInstanceData.dag_version.version_number : undefined) ??
+ mappedTaskInstanceData?.dag_version?.version_number ??
(runData?.dag_versions ?? []).at(-1)?.version_number ??
dagData?.latest_dag_version?.version_number;
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index cdc132c6e58..ac72e6da31f 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -1618,7 +1618,6 @@ class TaskInstanceResponse(BaseModel):
id: Annotated[str, Field(title="Id")]
task_id: Annotated[str, Field(title="Task Id")]
dag_id: Annotated[str, Field(title="Dag Id")]
- dag_version: DagVersionResponse
dag_run_id: Annotated[str, Field(title="Dag Run Id")]
map_index: Annotated[int, Field(title="Map Index")]
logical_date: Annotated[datetime | None, Field(title="Logical Date")] =
None
@@ -1648,6 +1647,7 @@ class TaskInstanceResponse(BaseModel):
rendered_fields: Annotated[dict[str, Any] | None, Field(title="Rendered
Fields")] = None
trigger: TriggerResponse | None = None
triggerer_job: JobResponse | None = None
+ dag_version: DagVersionResponse | None = None
class TaskResponse(BaseModel):