This is an automated email from the ASF dual-hosted git repository.
jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2d9b7696b7e feat: add partition_key to DagRunAssetReference (#61725)
2d9b7696b7e is described below
commit 2d9b7696b7efce35a5a142f09975756f795d4f12
Author: Wei Lee <[email protected]>
AuthorDate: Thu Feb 12 23:22:27 2026 +0800
feat: add partition_key to DagRunAssetReference (#61725)
* feat: add partition_key to DagRunAssetReference
* test: extend test case to include partition_key
* feat: add migration
* fixup! feat: add migration
---
.../airflow/api_fastapi/core_api/datamodels/assets.py | 3 ++-
.../core_api/openapi/v2-rest-api-generated.yaml | 8 +++++++-
.../execution_api/datamodels/asset_event.py | 1 +
.../api_fastapi/execution_api/versions/v2025_11_07.py | 7 ++++++-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 15 +++++++++++++--
.../src/airflow/ui/openapi-gen/requests/types.gen.ts | 3 ++-
.../api_fastapi/core_api/routes/public/test_assets.py | 4 ++++
.../core_api/routes/public/test_dag_run.py | 19 +++++++++++++++----
.../src/airflowctl/api/datamodels/generated.py | 3 ++-
airflow-ctl/tests/airflow_ctl/api/test_operations.py | 1 +
task-sdk/src/airflow/sdk/api/datamodels/_generated.py | 1 +
11 files changed, 54 insertions(+), 11 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
index d6a1106dbc2..c6cb2fa2827 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -113,7 +113,7 @@ class AssetAliasCollectionResponse(BaseModel):
class DagRunAssetReference(StrictBaseModel):
- """DAGRun serializer for asset responses."""
+ """DagRun serializer for asset responses."""
run_id: str
dag_id: str
@@ -123,6 +123,7 @@ class DagRunAssetReference(StrictBaseModel):
state: str
data_interval_start: datetime | None
data_interval_end: datetime | None
+ partition_key: str | None
class AssetEventResponse(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 11c8007ada6..40d88a96b82 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -11043,6 +11043,11 @@ components:
format: date-time
- type: 'null'
title: Data Interval End
+ partition_key:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Partition Key
additionalProperties: false
type: object
required:
@@ -11054,8 +11059,9 @@ components:
- state
- data_interval_start
- data_interval_end
+ - partition_key
title: DagRunAssetReference
- description: DAGRun serializer for asset responses.
+ description: DagRun serializer for asset responses.
DagRunState:
type: string
enum:
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
index 64623c17fb1..f6c3ce82669 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
@@ -36,6 +36,7 @@ class DagRunAssetReference(StrictBaseModel):
state: str
data_interval_start: datetime | None
data_interval_end: datetime | None
+ partition_key: str | None
class AssetEventResponse(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py
index 9363d825cef..117ba492455 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py
@@ -19,7 +19,11 @@ from __future__ import annotations
from cadwyn import ResponseInfo, VersionChange,
convert_response_to_previous_version_for, schema
-from airflow.api_fastapi.execution_api.datamodels.asset_event import
AssetEventResponse, AssetEventsResponse
+from airflow.api_fastapi.execution_api.datamodels.asset_event import (
+ AssetEventResponse,
+ AssetEventsResponse,
+ DagRunAssetReference,
+)
from airflow.api_fastapi.execution_api.datamodels.dagrun import
TriggerDAGRunPayload
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun,
TIRunContext
@@ -33,6 +37,7 @@ class AddPartitionKeyField(VersionChange):
schema(DagRun).field("partition_key").didnt_exist,
schema(AssetEventResponse).field("partition_key").didnt_exist,
schema(TriggerDAGRunPayload).field("partition_key").didnt_exist,
+ schema(DagRunAssetReference).field("partition_key").didnt_exist,
)
@convert_response_to_previous_version_for(TIRunContext) # type:
ignore[arg-type]
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index e0b53fb6d6f..a533f31c31f 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3228,13 +3228,24 @@ export const $DagRunAssetReference = {
}
],
title: 'Data Interval End'
+ },
+ partition_key: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Partition Key'
}
},
additionalProperties: false,
type: 'object',
- required: ['run_id', 'dag_id', 'logical_date', 'start_date', 'end_date',
'state', 'data_interval_start', 'data_interval_end'],
+ required: ['run_id', 'dag_id', 'logical_date', 'start_date', 'end_date',
'state', 'data_interval_start', 'data_interval_end', 'partition_key'],
title: 'DagRunAssetReference',
- description: 'DAGRun serializer for asset responses.'
+ description: 'DagRun serializer for asset responses.'
} as const;
export const $DagRunState = {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index c409b984d10..0c3ecfe7ce0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -782,7 +782,7 @@ export type DagProcessorInfoResponse = {
};
/**
- * DAGRun serializer for asset responses.
+ * DagRun serializer for asset responses.
*/
export type DagRunAssetReference = {
run_id: string;
@@ -793,6 +793,7 @@ export type DagRunAssetReference = {
state: string;
data_interval_start: string | null;
data_interval_end: string | null;
+ partition_key: string | null;
};
/**
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 607a7474893..da3fba12a6a 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -815,6 +815,7 @@ class TestGetAssetEvents(TestAssets):
"state": "success",
"data_interval_start":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"data_interval_end":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+ "partition_key": None,
}
],
"timestamp":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
@@ -843,6 +844,7 @@ class TestGetAssetEvents(TestAssets):
"state": "success",
"data_interval_start":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"data_interval_end":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+ "partition_key": None,
}
],
"timestamp":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
@@ -998,6 +1000,7 @@ class TestGetAssetEvents(TestAssets):
"state": "success",
"data_interval_start":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"data_interval_end":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+ "partition_key": None,
}
],
"timestamp":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
@@ -1026,6 +1029,7 @@ class TestGetAssetEvents(TestAssets):
"state": "success",
"data_interval_start":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"data_interval_end":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+ "partition_key": None,
}
],
"timestamp":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 577acd5fac5..1664103271f 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -1343,12 +1343,17 @@ class TestDeleteDagRun:
class TestGetDagRunAssetTriggerEvents:
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
- def test_should_respond_200(self, test_client, dag_maker, session):
+ @pytest.mark.parametrize(
+ "partition_key",
+ ["test_partition_key", None],
+ ids=["partitioned", "non-partitioned"],
+ )
+ def test_should_respond_200(self, partition_key, test_client, dag_maker,
session):
asset1 = Asset(name="ds1", uri="file:///da1")
with dag_maker(dag_id="source_dag", start_date=START_DATE1,
session=session):
EmptyOperator(task_id="task", outlets=[asset1])
- dr = dag_maker.create_dagrun()
+ dr = dag_maker.create_dagrun(partition_key=partition_key)
ti = dr.task_instances[0]
asset1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset1.uri))
@@ -1358,12 +1363,17 @@ class TestGetDagRunAssetTriggerEvents:
source_dag_id=ti.dag_id,
source_run_id=ti.run_id,
source_map_index=ti.map_index,
+ partition_key=partition_key,
)
session.add(event)
with dag_maker(dag_id="TEST_DAG_ID", start_date=START_DATE1,
session=session):
pass
- dr = dag_maker.create_dagrun(run_id="TEST_DAG_RUN_ID",
run_type=DagRunType.ASSET_TRIGGERED)
+ dr = dag_maker.create_dagrun(
+ run_id="TEST_DAG_RUN_ID",
+ run_type=DagRunType.ASSET_TRIGGERED,
+ partition_key=partition_key,
+ )
dr.consumed_asset_events.append(event)
session.commit()
@@ -1398,9 +1408,10 @@ class TestGetDagRunAssetTriggerEvents:
"logical_date":
from_datetime_to_zulu_without_ms(dr.logical_date),
"start_date":
from_datetime_to_zulu_without_ms(dr.start_date),
"state": "running",
+ "partition_key": partition_key,
}
],
- "partition_key": None,
+ "partition_key": partition_key,
}
],
"total_entries": 1,
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 921709c0b68..92d77aa4a33 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -339,7 +339,7 @@ class DagProcessorInfoResponse(BaseModel):
class DagRunAssetReference(BaseModel):
"""
- DAGRun serializer for asset responses.
+ DagRun serializer for asset responses.
"""
model_config = ConfigDict(
@@ -353,6 +353,7 @@ class DagRunAssetReference(BaseModel):
state: Annotated[str, Field(title="State")]
data_interval_start: Annotated[datetime | None, Field(title="Data Interval
Start")] = None
data_interval_end: Annotated[datetime | None, Field(title="Data Interval
End")] = None
+ partition_key: Annotated[str | None, Field(title="Partition Key")] = None
class DagRunState(str, Enum):
diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py
b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
index 5e4a8fb9315..ecfa758df32 100644
--- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py
+++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
@@ -272,6 +272,7 @@ class TestAssetsOperations:
state="RUNNING",
data_interval_start=datetime.datetime(2025, 1, 1, 0, 0, 0),
data_interval_end=datetime.datetime(2025, 1, 1, 0, 0, 0),
+ partition_key=None,
)
asset_event_response = AssetEventResponse(
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 3c2099ffbf5..32824a48b21 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -94,6 +94,7 @@ class DagRunAssetReference(BaseModel):
state: Annotated[str, Field(title="State")]
data_interval_start: Annotated[AwareDatetime | None, Field(title="Data
Interval Start")] = None
data_interval_end: Annotated[AwareDatetime | None, Field(title="Data
Interval End")] = None
+ partition_key: Annotated[str | None, Field(title="Partition Key")] = None
class DagRunState(str, Enum):