This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new d5620a47b56 [v3-2-test] Aggregate CI-image dependency groups so 
providers can register non-default extras with a one-line change (#67130) 
(#67139)
d5620a47b56 is described below

commit d5620a47b56095fdddaa42c16190c43264dd8273
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 2 19:47:11 2026 +0200

    [v3-2-test] Aggregate CI-image dependency groups so providers can register 
non-default extras with a one-line change (#67130) (#67139)
    
    * Fix OTel timer metrics using Gauge instead of Histogram (#64207) (#66865)
    
    * Fix OTel timer metrics using Gauge instead of Histogram
    
    * Use ExponentialBucketHistogramAggregation for timing metrics
    
    * Use public API import path for ExponentialBucketHistogramAggregation and 
fix histogram map isolation
    
    (cherry picked from commit b2dadd2b7623d0d99f6fea0521bf008b7b957cac)
    
    Co-authored-by: namratachaudhary <[email protected]>
    
    * Add name fields to SDK deadline alerts (#64926) (#65601)
    
    * Add deadlines support with name and description fields in alerts and UI
    
    * Add 'viewAll' label to deadlineStatus in dag.json
    
    * Refactor deadlineAlerts referenceType structure
    
    * Refine deadline alert translations in dag.json
    
    Updated deadline alert messages for clarity and consistency.
    
    * Add completion rule text for deadline alerts in UI
    
    * Remove duplicate completionRule entry in deadlineAlerts
    
    * Remove alert description from DeadlineAlert and related models
    
    * Enhance deadline handling: return name updates alongside UUID mapping in 
SerializedDagModel
    
    * Add alert_id field to DeadlineResponse and update tests for alert handling
    
    * Remove DEADLINES option from MenuItem enum
    
    * Update airflow-core/src/airflow/serialization/encoders.py
    
    
    
    ---------
    
    
    (cherry picked from commit e9d106667c7fe98f77c1312bb082c010b472c388)
    
    Co-authored-by: Richard Wu <[email protected]>
    Co-authored-by: D. Ferruzzi <[email protected]>
    
    * [v3-2-test] Allow accessing a TaskGroup's members via `[]` (#64430) 
(#65707)
    
    (cherry picked from commit aa54031f3cece54b1492bb13648bd521d5bf3082)
    
    Co-authored-by: Dev-iL <[email protected]>
    
    * [v3-2-test] Skip test_schedule_tis_start_trigger pending #55068 backport 
decision (#66315)
    
    The `start_from_trigger` deferred-at-schedule path is commented out
    on v3-2-test (disabled in 91e10295c7d as a TODO) and was only
    re-enabled on main by #55068, which landed one day after the v3-2
    branch was cut. The test asserts the feature works and fails on
    v3-2-test CI.
    
    Skip the test on the 3.2 line and link the tracking issue from both
    the test skip-reason and the disabled production-code site so the
    follow-up isn't lost. Decision (backport #55068 or formally drop
    the feature on 3.2) is tracked in #66307.
    
    * [v3-2-test] Aggregate CI-image dependency groups so providers can 
register non-default extras with a one-line change (#67130)
    (cherry picked from commit e61640e2a1bceb3ee864d2dd552f623ac76bfb2d)
    
    Co-authored-by: Bugra Ozturk <[email protected]>
    
    ---------
    
    Co-authored-by: Rahul Vats <[email protected]>
    Co-authored-by: namratachaudhary <[email protected]>
    Co-authored-by: Pierre Jeambrun <[email protected]>
    Co-authored-by: Richard Wu <[email protected]>
    Co-authored-by: D. Ferruzzi <[email protected]>
    Co-authored-by: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
    Co-authored-by: Dev-iL <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
    Co-authored-by: Bugra Ozturk <[email protected]>
---
 Dockerfile                                         | 12 ++---
 Dockerfile.ci                                      | 12 ++---
 airflow-core/newsfragments/64207.significant.rst   |  1 +
 .../api_fastapi/core_api/datamodels/ui/deadline.py |  4 +-
 .../api_fastapi/core_api/openapi/_private_ui.yaml  |  9 ++--
 airflow-core/src/airflow/models/serialized_dag.py  | 39 +++++++++++---
 airflow-core/src/airflow/serialization/decoders.py |  1 +
 .../airflow/serialization/definitions/deadline.py  |  2 +
 airflow-core/src/airflow/serialization/encoders.py |  1 +
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts | 11 ++--
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  2 +-
 .../core_api/routes/ui/test_deadlines.py           | 12 ++---
 .../tests/unit/models/test_deadline_alert.py       |  4 --
 .../tests/unit/models/test_serialized_dag.py       | 62 ++++++++++++++++++++++
 contributing-docs/12_provider_distributions.rst    | 43 +++++++++++++++
 .../utils/constraints_version_check.py             |  8 +--
 pyproject.toml                                     | 13 +++++
 .../docker/install_airflow_when_building_images.sh | 12 ++---
 .../observability/metrics/otel_logger.py           | 58 +++++++++++++++++---
 .../observability/metrics/test_otel_logger.py      | 47 +++++++++++-----
 task-sdk/src/airflow/sdk/definitions/deadline.py   |  2 +
 task-sdk/src/airflow/sdk/definitions/taskgroup.py  |  4 ++
 .../tests/task_sdk/definitions/test_taskgroup.py   | 25 +++++++++
 23 files changed, 311 insertions(+), 73 deletions(-)

diff --git a/Dockerfile b/Dockerfile
index 9551f4d4d0d..e8c59d469b5 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1228,8 +1228,8 @@ function install_from_sources() {
         # (binary lxml embeds its own libxml2, while xmlsec uses system one).
         # See https://bugs.launchpad.net/lxml/+bug/2110068
         set -x
-        uv sync --all-packages --resolution highest --group dev --group docs 
--group docs-gen \
-            --group leveldb ${extra_sync_flags} --no-binary-package lxml 
--no-binary-package xmlsec \
+        uv sync --all-packages --resolution highest --group ci-image \
+            ${extra_sync_flags} --no-binary-package lxml --no-binary-package 
xmlsec \
             --no-python-downloads --no-managed-python
     else
         set +x
@@ -1241,8 +1241,8 @@ function install_from_sources() {
         # libxml2 (binary lxml embeds its own libxml2, while xmlsec uses 
system one).
         # See https://bugs.launchpad.net/lxml/+bug/2110068
         set -x
-        if ! uv sync --all-packages --frozen --group dev --group docs --group 
docs-gen \
-            --group leveldb ${extra_sync_flags} --no-binary-package lxml 
--no-binary-package xmlsec \
+        if ! uv sync --all-packages --frozen --group ci-image \
+            ${extra_sync_flags} --no-binary-package lxml --no-binary-package 
xmlsec \
             --no-python-downloads --no-managed-python; then
             set +x
             if [[ ${AIRFLOW_FALLBACK_NO_CONSTRAINTS_INSTALLATION} != "true" 
]]; then
@@ -1257,8 +1257,8 @@ function install_from_sources() {
             echo "${COLOR_BLUE}Falling back to re-resolving dependencies (uv 
sync without --frozen).${COLOR_RESET}"
             echo
             set -x
-            uv sync --all-packages --group dev --group docs --group docs-gen \
-                --group leveldb ${extra_sync_flags} --no-binary-package lxml 
--no-binary-package xmlsec \
+            uv sync --all-packages --group ci-image \
+                ${extra_sync_flags} --no-binary-package lxml 
--no-binary-package xmlsec \
                 --no-python-downloads --no-managed-python
             set +x
         fi
diff --git a/Dockerfile.ci b/Dockerfile.ci
index 4c39038e021..dab1755cf62 100644
--- a/Dockerfile.ci
+++ b/Dockerfile.ci
@@ -933,8 +933,8 @@ function install_from_sources() {
         # (binary lxml embeds its own libxml2, while xmlsec uses system one).
         # See https://bugs.launchpad.net/lxml/+bug/2110068
         set -x
-        uv sync --all-packages --resolution highest --group dev --group docs 
--group docs-gen \
-            --group leveldb ${extra_sync_flags} --no-binary-package lxml 
--no-binary-package xmlsec \
+        uv sync --all-packages --resolution highest --group ci-image \
+            ${extra_sync_flags} --no-binary-package lxml --no-binary-package 
xmlsec \
             --no-python-downloads --no-managed-python
     else
         set +x
@@ -946,8 +946,8 @@ function install_from_sources() {
         # libxml2 (binary lxml embeds its own libxml2, while xmlsec uses 
system one).
         # See https://bugs.launchpad.net/lxml/+bug/2110068
         set -x
-        if ! uv sync --all-packages --frozen --group dev --group docs --group 
docs-gen \
-            --group leveldb ${extra_sync_flags} --no-binary-package lxml 
--no-binary-package xmlsec \
+        if ! uv sync --all-packages --frozen --group ci-image \
+            ${extra_sync_flags} --no-binary-package lxml --no-binary-package 
xmlsec \
             --no-python-downloads --no-managed-python; then
             set +x
             if [[ ${AIRFLOW_FALLBACK_NO_CONSTRAINTS_INSTALLATION} != "true" 
]]; then
@@ -962,8 +962,8 @@ function install_from_sources() {
             echo "${COLOR_BLUE}Falling back to re-resolving dependencies (uv 
sync without --frozen).${COLOR_RESET}"
             echo
             set -x
-            uv sync --all-packages --group dev --group docs --group docs-gen \
-                --group leveldb ${extra_sync_flags} --no-binary-package lxml 
--no-binary-package xmlsec \
+            uv sync --all-packages --group ci-image \
+                ${extra_sync_flags} --no-binary-package lxml 
--no-binary-package xmlsec \
                 --no-python-downloads --no-managed-python
             set +x
         fi
diff --git a/airflow-core/newsfragments/64207.significant.rst 
b/airflow-core/newsfragments/64207.significant.rst
new file mode 100644
index 00000000000..3254fa20a54
--- /dev/null
+++ b/airflow-core/newsfragments/64207.significant.rst
@@ -0,0 +1 @@
+OTel timer and timing metrics now use Histogram instead of Gauge, preserving 
count, sum, and bucket distribution across recordings.
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py
index 1ceddeac318..7d616d0449d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py
@@ -33,10 +33,8 @@ class DeadlineResponse(BaseModel):
     deadline_time: datetime
     missed: bool
     created_at: datetime
+    alert_id: UUID | None = Field(validation_alias="deadline_alert_id", 
default=None)
     alert_name: str | None = 
Field(validation_alias=AliasPath("deadline_alert", "name"), default=None)
-    alert_description: str | None = Field(
-        validation_alias=AliasPath("deadline_alert", "description"), 
default=None
-    )
 
 
 class DeadlineCollectionResponse(BaseModel):
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 63ea1bc04d0..007caac1b1d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -2284,16 +2284,17 @@ components:
           type: string
           format: date-time
           title: Created At
-        alert_name:
+        alert_id:
           anyOf:
           - type: string
+            format: uuid
           - type: 'null'
-          title: Alert Name
-        alert_description:
+          title: Alert Id
+        alert_name:
           anyOf:
           - type: string
           - type: 'null'
-          title: Alert Description
+          title: Alert Name
       type: object
       required:
       - id
diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index 18ad03546ef..be55c3e0908 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -431,7 +431,7 @@ class SerializedDagModel(Base):
         existing_deadline_uuids: list[str],
         new_deadline_data: list[dict],
         session: Session,
-    ) -> dict[str, dict] | None:
+    ) -> tuple[dict[str, dict], dict[str, str | None]] | None:
         """
         Try to reuse existing deadline UUIDs if the deadline definitions 
haven't changed.
 
@@ -440,7 +440,11 @@ class SerializedDagModel(Base):
         :param existing_deadline_uuids: List of UUID strings from existing 
serialized Dag
         :param new_deadline_data: List of new deadline alert data dicts from 
the Dag
         :param session: Database session
-        :return: UUID mapping dict if all match, None if any mismatch detected
+        :return: Tuple of (uuid_mapping, name_updates) if all definitions 
match, None if any
+            mismatch detected.  ``uuid_mapping`` maps UUID string → new 
deadline data dict.
+            ``name_updates`` maps UUID string → new name **only** for entries 
whose name
+            changed relative to the existing DB row, so callers can issue 
targeted UPDATEs
+            and reliably detect whether any DB write occurred.
         """
         # defensive check for old 3.1.x format
         if existing_deadline_uuids and not 
isinstance(existing_deadline_uuids[0], str):
@@ -468,6 +472,7 @@ class SerializedDagModel(Base):
 
         matched_uuids: set[UUID] = set()
         uuid_mapping: dict[str, dict] = {}
+        name_updates: dict[str, str | None] = {}
 
         for deadline_alert in new_deadline_data:
             deadline_data = deadline_alert.get(Encoding.VAR, deadline_alert)
@@ -479,9 +484,13 @@ class SerializedDagModel(Base):
 
                 if _definitions_match(deadline_data, existing_alert):
                     # Found a match, reuse this UUID
-                    uuid_mapping[str(existing_alert.id)] = deadline_data
+                    uuid_str = str(existing_alert.id)
+                    uuid_mapping[uuid_str] = deadline_data
                     matched_uuids.add(existing_alert.id)
                     found_match = True
+                    new_name = deadline_data.get(DeadlineAlertFields.NAME)
+                    if new_name != existing_alert.name:
+                        name_updates[uuid_str] = new_name
                     break
 
             if not found_match:
@@ -490,7 +499,7 @@ class SerializedDagModel(Base):
                 # to another deadline), so partial reuse would risk stale 
cross-references.
                 return None
 
-        return uuid_mapping
+        return uuid_mapping, name_updates
 
     @classmethod
     def _create_deadline_alert_records(
@@ -510,6 +519,7 @@ class SerializedDagModel(Base):
         for uuid_str, deadline_data in uuid_mapping.items():
             alert = DeadlineAlertModel(
                 id=UUID(uuid_str),
+                name=deadline_data.get(DeadlineAlertFields.NAME),
                 reference=deadline_data[DeadlineAlertFields.REFERENCE],
                 interval=deadline_data[DeadlineAlertFields.INTERVAL],
                 callback_def=deadline_data[DeadlineAlertFields.CALLBACK],
@@ -625,6 +635,7 @@ class SerializedDagModel(Base):
         serialized_dag_hash = _prefetched.dag_hash
         dag_version = _prefetched.dag_version
 
+        name_updated = False
         if dag.data.get("dag", {}).get("deadline"):
             # Try to reuse existing deadline UUIDs if the deadline definitions 
haven't changed.
             # This preserves the hash and avoids unnecessary 
SerializedDagModel recreations.
@@ -637,16 +648,24 @@ class SerializedDagModel(Base):
                 and existing_serialized_dag.data
                 and (existing_deadline_uuids := 
existing_serialized_dag.data.get("dag", {}).get("deadline"))
             ):
-                deadline_uuid_mapping = cls._try_reuse_deadline_uuids(
+                reuse_result = cls._try_reuse_deadline_uuids(
                     existing_deadline_uuids,
                     dag.data["dag"]["deadline"],
                     session,
                 )
 
-                if deadline_uuid_mapping is not None:
+                if reuse_result is not None:
+                    deadline_uuid_mapping, name_updates = reuse_result
                     # All deadlines matched — reuse the UUIDs to preserve hash.
-                    # Clear the mapping since the alert rows already exist in 
the DB;
-                    # no need to delete and recreate identical records.
+                    # Only issue UPDATE statements for rows whose name 
actually changed to
+                    # avoid unnecessary writes and to make the return value 
accurate.
+                    for uuid_str, new_name in name_updates.items():
+                        session.execute(
+                            update(DeadlineAlertModel)
+                            .where(DeadlineAlertModel.id == UUID(uuid_str))
+                            .values(name=new_name)
+                        )
+                    name_updated = bool(name_updates)
                     dag.data["dag"]["deadline"] = existing_deadline_uuids
                     deadline_uuid_mapping = {}
                 else:
@@ -665,6 +684,10 @@ class SerializedDagModel(Base):
             and dag_version
             and dag_version.bundle_name == bundle_name
         ):
+            if name_updated:
+                # The serialized DAG itself is unchanged, but deadline alert 
name(s) were
+                # updated in the DB, so report True so callers know a write 
did occur.
+                return True
             log.debug("Serialized DAG (%s) is unchanged. Skipping writing to 
DB", dag.dag_id)
             return False
 
diff --git a/airflow-core/src/airflow/serialization/decoders.py 
b/airflow-core/src/airflow/serialization/decoders.py
index a25068443e4..27db6134b13 100644
--- a/airflow-core/src/airflow/serialization/decoders.py
+++ b/airflow-core/src/airflow/serialization/decoders.py
@@ -166,6 +166,7 @@ def decode_deadline_alert(encoded_data: dict):
         reference=reference,
         
interval=datetime.timedelta(seconds=data[DeadlineAlertFields.INTERVAL]),
         callback=deserialize(data[DeadlineAlertFields.CALLBACK]),
+        name=data.get(DeadlineAlertFields.NAME),
     )
 
 
diff --git a/airflow-core/src/airflow/serialization/definitions/deadline.py 
b/airflow-core/src/airflow/serialization/definitions/deadline.py
index 061883c6fcd..58eaa46e6f7 100644
--- a/airflow-core/src/airflow/serialization/definitions/deadline.py
+++ b/airflow-core/src/airflow/serialization/definitions/deadline.py
@@ -49,6 +49,7 @@ class DeadlineAlertFields:
     serializing DeadlineAlert instances to and from their dictionary 
representation.
     """
 
+    NAME = "name"
     REFERENCE = "reference"
     INTERVAL = "interval"
     CALLBACK = "callback"
@@ -367,3 +368,4 @@ class SerializedDeadlineAlert:
     reference: SerializedReferenceModels.SerializedBaseDeadlineReference
     interval: timedelta
     callback: Any
+    name: str | None = None
diff --git a/airflow-core/src/airflow/serialization/encoders.py 
b/airflow-core/src/airflow/serialization/encoders.py
index 2f30511a1e5..87d7ef1f137 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -211,6 +211,7 @@ def encode_deadline_alert(d: DeadlineAlert | 
SerializedDeadlineAlert) -> dict[st
     from airflow.sdk.serde import serialize
 
     return {
+        "name": d.name,
         "reference": encode_deadline_reference(d.reference),
         "interval": d.interval.total_seconds(),
         "callback": serialize(d.callback),
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 1ae88b9433e..a5746641938 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -8128,18 +8128,19 @@ export const $DeadlineResponse = {
             format: 'date-time',
             title: 'Created At'
         },
-        alert_name: {
+        alert_id: {
             anyOf: [
                 {
-                    type: 'string'
+                    type: 'string',
+                    format: 'uuid'
                 },
                 {
                     type: 'null'
                 }
             ],
-            title: 'Alert Name'
+            title: 'Alert Id'
         },
-        alert_description: {
+        alert_name: {
             anyOf: [
                 {
                     type: 'string'
@@ -8148,7 +8149,7 @@ export const $DeadlineResponse = {
                     type: 'null'
                 }
             ],
-            title: 'Alert Description'
+            title: 'Alert Name'
         }
     },
     type: 'object',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index cdd4d73211b..1c43301db52 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -2008,8 +2008,8 @@ export type DeadlineResponse = {
     deadline_time: string;
     missed: boolean;
     created_at: string;
+    alert_id?: string | null;
     alert_name?: string | null;
-    alert_description?: string | null;
 };
 
 /**
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_deadlines.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_deadlines.py
index 4dec594eb64..fb69818fa18 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_deadlines.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_deadlines.py
@@ -52,7 +52,6 @@ RUN_MULTI = "run_multi"  # 3 deadlines added out-of-order 
(ordering test)
 RUN_OTHER = "run_other"  # has 1 deadline; used to verify per-run isolation
 
 ALERT_NAME = "SLA Breach Alert"
-ALERT_DESCRIPTION = "Fires when SLA is breached"
 
 _CALLBACK_PATH = 
"tests.unit.api_fastapi.core_api.routes.ui.test_deadlines._noop_callback"
 
@@ -154,7 +153,6 @@ def setup(dag_maker, session):
     alert = DeadlineAlert(
         serialized_dag_id=serialized_dag.id,
         name=ALERT_NAME,
-        description=ALERT_DESCRIPTION,
         reference=DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference(),
         interval=3600.0,
         callback_def={"path": _CALLBACK_PATH},
@@ -226,7 +224,7 @@ class TestGetDagRunDeadlines:
         assert deadline1["deadline_time"] == "2025-01-01T12:00:00Z"
         assert deadline1["missed"] is False
         assert deadline1["alert_name"] is None
-        assert deadline1["alert_description"] is None
+        assert deadline1["alert_id"] is None
         assert "id" in deadline1
         assert "created_at" in deadline1
 
@@ -237,14 +235,16 @@ class TestGetDagRunDeadlines:
         assert data["total_entries"] == 1
         assert data["deadlines"][0]["missed"] is True
 
-    def test_deadline_with_alert_name_and_description(self, test_client):
+    def test_deadline_with_alert_name(self, test_client, session):
+        alert = session.scalar(select(DeadlineAlert).where(DeadlineAlert.name 
== ALERT_NAME))
         with assert_queries_count(4):
             response = 
test_client.get(f"/dags/{DAG_ID}/dagRuns/{RUN_ALERT}/deadlines")
         assert response.status_code == 200
         data = response.json()
         assert data["total_entries"] == 1
-        assert data["deadlines"][0]["alert_name"] == ALERT_NAME
-        assert data["deadlines"][0]["alert_description"] == ALERT_DESCRIPTION
+        dl = data["deadlines"][0]
+        assert dl["alert_name"] == ALERT_NAME
+        assert dl["alert_id"] == str(alert.id)
 
     def test_deadlines_ordered_by_deadline_time_ascending(self, test_client):
         with assert_queries_count(4):
diff --git a/airflow-core/tests/unit/models/test_deadline_alert.py 
b/airflow-core/tests/unit/models/test_deadline_alert.py
index 9d69577a6d6..a9b1854f6ab 100644
--- a/airflow-core/tests/unit/models/test_deadline_alert.py
+++ b/airflow-core/tests/unit/models/test_deadline_alert.py
@@ -34,7 +34,6 @@ from unit.models import DEFAULT_DATE
 
 DAG_ID = "test_deadline_alert_dag"
 DEADLINE_NAME = "Test Alert"
-DEADLINE_DESCRIPTION = "This is a test alert description"
 DEADLINE_INTERVAL = 60
 DEADLINE_CALLBACK = {"path": "test.callback"}
 SERIALIZED_DAG_ID = "serialized_dag_uuid"
@@ -62,7 +61,6 @@ def deadline_alert_orm(dag_maker, session, 
deadline_reference):
         alert = DeadlineAlert(
             serialized_dag_id=serialized_dag.id,
             name=DEADLINE_NAME,
-            description=DEADLINE_DESCRIPTION,
             reference=deadline_reference,
             interval=DEADLINE_INTERVAL,
             callback_def=DEADLINE_CALLBACK,
@@ -86,7 +84,6 @@ class TestDeadlineAlert:
         assert deadline_alert_orm.id is not None
         assert deadline_alert_orm.created_at == DEFAULT_DATE
         assert deadline_alert_orm.name == DEADLINE_NAME
-        assert deadline_alert_orm.description == DEADLINE_DESCRIPTION
 
     def test_minimal_deadline_alert_creation(self, dag_maker, session, 
deadline_reference):
         with dag_maker(DAG_ID, session=session):
@@ -109,7 +106,6 @@ class TestDeadlineAlert:
             assert deadline_alert.id is not None
             assert deadline_alert.created_at == DEFAULT_DATE
             assert deadline_alert.name is None
-            assert deadline_alert.description is None
 
     def test_deadline_alert_repr(self, deadline_alert_orm, deadline_reference):
         repr_str = repr(deadline_alert_orm)
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py 
b/airflow-core/tests/unit/models/test_serialized_dag.py
index 2185635590e..54438f8e82f 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -850,3 +850,65 @@ class TestSerializedDagModel:
         assert new_serdag_count == 2
         assert new_serdag.dag_hash != orig_serdag.dag_hash
         assert new_alert.interval == 600.0
+
+    def test_deadline_name_change_updates_db_and_returns_true(self, 
testing_dag_bundle, session):
+        """Name-only deadline change: UUID reused, DB row updated, write_dag 
returns True."""
+        dag_id = "test_deadline_name_change"
+
+        dag = DAG(
+            dag_id=dag_id,
+            deadline=DeadlineAlert(
+                reference=DeadlineReference.DAGRUN_QUEUED_AT,
+                interval=timedelta(minutes=5),
+                callback=AsyncCallback(empty_callback_for_deadline),
+                name="original name",
+            ),
+        )
+        EmptyOperator(task_id="task1", dag=dag)
+        scheduler_dag = sync_dag_to_db(dag, session=session)
+        scheduler_dag.create_dagrun(
+            run_id="test1",
+            run_after=DEFAULT_DATE,
+            state=DagRunState.QUEUED,
+            logical_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            triggered_by=DagRunTriggeredByType.TEST,
+            run_type=DagRunType.MANUAL,
+        )
+        session.commit()
+
+        orig_serdag = session.scalar(select(SDM).where(SDM.dag_id == 
dag_id).order_by(SDM.created_at.desc()))
+        orig_hash = orig_serdag.dag_hash
+        orig_alert = session.scalar(select(DAM).where(DAM.serialized_dag_id == 
orig_serdag.id))
+        orig_uuid = orig_alert.id
+
+        # Change only the name — reference, interval, and callback are 
identical.
+        dag.deadline = DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=5),
+            callback=AsyncCallback(empty_callback_for_deadline),
+            name="updated name",
+        )
+
+        did_write = SDM.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name="testing", session=session)
+        session.commit()
+
+        # write_dag must report True because a DB write (name UPDATE) did 
occur.
+        assert did_write is True
+
+        serdag_count = 
session.scalar(select(func.count()).select_from(SDM).where(SDM.dag_id == 
dag_id))
+        latest_serdag = session.scalar(
+            select(SDM).where(SDM.dag_id == 
dag_id).order_by(SDM.created_at.desc())
+        )
+        updated_alert = session.scalar(select(DAM).where(DAM.id == orig_uuid))
+
+        # No new SerializedDagModel row — UUID was reused so the hash is 
unchanged.
+        assert serdag_count == 1
+        assert latest_serdag.dag_hash == orig_hash
+
+        # The DeadlineAlert row must still use the same UUID.
+        assert updated_alert is not None
+        assert updated_alert.id == orig_uuid
+
+        # The name must have been updated in the DB.
+        assert updated_alert.name == "updated name"
diff --git a/contributing-docs/12_provider_distributions.rst 
b/contributing-docs/12_provider_distributions.rst
index 523ad2a8fca..8e376f61b01 100644
--- a/contributing-docs/12_provider_distributions.rst
+++ b/contributing-docs/12_provider_distributions.rst
@@ -117,6 +117,49 @@ you should do it before you make a PR with such changed 
dependency changes
 Also, you should rebuild the image ``breeze ci-image build`` or answer ``y`` 
when you are asked to rebuild the
 image for the new dependencies to be used in the Breeze CI environment.
 
+Non-default provider extras
+---------------------------
+
+Some providers depend on packages that cannot be installed in CI by default — 
for example a
+proprietary client library (IBM MQ's ``ibmmq``) or a native library that 
requires system packages
+(Google's ``leveldb``/``plyvel`` needs ``libleveldb-dev``). Pulling these into 
the default
+``uv sync`` would break CI on every runner that doesn't have the prerequisite 
installed.
+
+For these cases, declare the dependency as an extra on the provider and 
register it as its own
+group at the root, without adding it to ``dev``:
+
+1. **In the provider's ``pyproject.toml``** — keep the package under 
``[project.optional-dependencies]``
+   so users can opt in with ``pip install 
apache-airflow-providers-<id>[<extra>]``.
+
+2. **In the root ``pyproject.toml``** — add a dedicated entry under 
``[dependency-groups]``,
+   then include it from the ``ci-image`` aggregate group. The ``ci-image`` 
group is the single
+   source of truth referenced by ``Dockerfile``, ``Dockerfile.ci``,
+   ``scripts/docker/install_airflow_when_building_images.sh``, and the breeze 
constraints
+   checker — adding your group there is the only change required for the CI 
image to pick it up:
+
+   .. code:: toml
+
+       [dependency-groups]
+       my-non-default-extra = [
+           "some-package>=1.2.3",
+       ]
+
+       ci-image = [
+           {include-group = "dev"},
+           {include-group = "docs"},
+           {include-group = "docs-gen"},
+           {include-group = "leveldb"},
+           {include-group = "my-non-default-extra"},
+       ]
+
+3. **If system libraries are required**, add them to 
``scripts/docker/install_os_dependencies.sh``
+   so the CI image has the prerequisites before ``uv sync`` runs.
+
+Because the new group is *not* part of ``dev``, a plain ``uv sync`` on a 
contributor's machine
+will not try to install it. The CI image installs it via ``ci-image``; 
provider unit tests that
+import the proprietary or hard-to-build module should mock it (see 
``providers/google/leveldb``
+for the established pattern).
+
 Provider's cross-dependencies
 -----------------------------
 
diff --git a/dev/breeze/src/airflow_breeze/utils/constraints_version_check.py 
b/dev/breeze/src/airflow_breeze/utils/constraints_version_check.py
index 75c5ee22af5..ced0564d855 100755
--- a/dev/breeze/src/airflow_breeze/utils/constraints_version_check.py
+++ b/dev/breeze/src/airflow_breeze/utils/constraints_version_check.py
@@ -593,10 +593,10 @@ def explain_package_upgrade(
     additional_args = []
     if airflow_constraints_mode == "constraints-source-providers":
         # In case of source constraints we also need to add all development 
dependencies
-        # to reflect exactly what is installed in the CI image by default
-        additional_args.extend(
-            ["--group", "dev", "--group", "docs", "--group", "docs-gen", 
"--group", "leveldb"]
-        )
+        # to reflect exactly what is installed in the CI image by default. The 
``ci-image``
+        # group aggregates dev/docs/docs-gen plus any hard-to-install provider 
extras
+        # (see root pyproject.toml).
+        additional_args.extend(["--group", "ci-image"])
     with (
         preserve_pyproject_file(AIRFLOW_ROOT_PATH / "pyproject.toml") as 
airflow_pyproject,
         preserve_pyproject_file(AIRFLOW_ROOT_PATH / "uv.lock"),
diff --git a/pyproject.toml b/pyproject.toml
index e9ee8bc533c..755f5cbd231 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1343,6 +1343,19 @@ leveldb = [
     "plyvel>=1.5.1"
 ]
 
+# Aggregate of every group the CI image installs by default. Single source of
+# truth referenced from the Dockerfiles, the image install script, and the
+# breeze constraints checker. When a provider needs to register a
+# hard-to-install extra (proprietary client, needs system libs, etc.), add a
+# dedicated [dependency-groups] entry above and include it here — no Dockerfile
+# or breeze edits required. See 
contributing-docs/12_provider_distributions.rst.
+ci-image = [
+    {include-group = "dev"},
+    {include-group = "docs"},
+    {include-group = "docs-gen"},
+    {include-group = "leveldb"},
+]
+
 [tool.uv]
 # Bump this only when the project actually relies on a newer uv feature/fix. 
It is a
 # minimum contributors must install, NOT the uv CI pins to — keeping it in 
lockstep
diff --git a/scripts/docker/install_airflow_when_building_images.sh 
b/scripts/docker/install_airflow_when_building_images.sh
index 85f879786f3..772465678a8 100644
--- a/scripts/docker/install_airflow_when_building_images.sh
+++ b/scripts/docker/install_airflow_when_building_images.sh
@@ -57,8 +57,8 @@ function install_from_sources() {
         # (binary lxml embeds its own libxml2, while xmlsec uses system one).
         # See https://bugs.launchpad.net/lxml/+bug/2110068
         set -x
-        uv sync --all-packages --resolution highest --group dev --group docs 
--group docs-gen \
-            --group leveldb ${extra_sync_flags} --no-binary-package lxml 
--no-binary-package xmlsec \
+        uv sync --all-packages --resolution highest --group ci-image \
+            ${extra_sync_flags} --no-binary-package lxml --no-binary-package 
xmlsec \
             --no-python-downloads --no-managed-python
     else
         set +x
@@ -70,8 +70,8 @@ function install_from_sources() {
         # libxml2 (binary lxml embeds its own libxml2, while xmlsec uses 
system one).
         # See https://bugs.launchpad.net/lxml/+bug/2110068
         set -x
-        if ! uv sync --all-packages --frozen --group dev --group docs --group 
docs-gen \
-            --group leveldb ${extra_sync_flags} --no-binary-package lxml 
--no-binary-package xmlsec \
+        if ! uv sync --all-packages --frozen --group ci-image \
+            ${extra_sync_flags} --no-binary-package lxml --no-binary-package 
xmlsec \
             --no-python-downloads --no-managed-python; then
             set +x
             if [[ ${AIRFLOW_FALLBACK_NO_CONSTRAINTS_INSTALLATION} != "true" 
]]; then
@@ -86,8 +86,8 @@ function install_from_sources() {
             echo "${COLOR_BLUE}Falling back to re-resolving dependencies (uv 
sync without --frozen).${COLOR_RESET}"
             echo
             set -x
-            uv sync --all-packages --group dev --group docs --group docs-gen \
-                --group leveldb ${extra_sync_flags} --no-binary-package lxml 
--no-binary-package xmlsec \
+            uv sync --all-packages --group ci-image \
+                ${extra_sync_flags} --no-binary-package lxml 
--no-binary-package xmlsec \
                 --no-python-downloads --no-managed-python
             set +x
         fi
diff --git 
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py 
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
index c8db3ee02f9..b7b8effe8dc 100644
--- 
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
+++ 
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
@@ -30,6 +30,7 @@ from opentelemetry.sdk.metrics._internal.export import (
     ConsoleMetricExporter,
     PeriodicExportingMetricReader,
 )
+from opentelemetry.sdk.metrics.view import 
ExponentialBucketHistogramAggregation, View
 from opentelemetry.sdk.resources import SERVICE_NAME, Resource
 
 from ..common import get_otel_data_exporter
@@ -146,7 +147,8 @@ class _OtelTimer(Timer):
     """
     An implementation of Stats.Timer() which records the result in the OTel 
Metrics Map.
 
-    OpenTelemetry does not have a native timer, we will store the values as a 
Gauge.
+    OpenTelemetry does not have a native timer; values are stored as a 
Histogram so that
+    all observations (count, sum, bucket distribution) are preserved across 
multiple recordings.
 
     :param name: The name of the timer.
     :param tags: Tags to append to the timer.
@@ -160,9 +162,9 @@ class _OtelTimer(Timer):
 
     def stop(self, send: bool = True) -> None:
         super().stop(send)
-        if self.name and send and self.duration:
-            self.otel_logger.metrics_map.set_gauge_value(
-                full_name(prefix=self.otel_logger.prefix, name=self.name), 
self.duration, False, self.tags
+        if self.name and send and self.duration is not None:
+            self.otel_logger.metrics_map.record_histogram_value(
+                full_name(prefix=self.otel_logger.prefix, name=self.name), 
self.duration, self.tags
             )
 
 
@@ -278,11 +280,11 @@ class SafeOtelLogger:
         *,
         tags: Attributes = None,
     ) -> None:
-        """OTel does not have a native timer, stored as a Gauge whose value is 
elapsed ms."""
+        """Record a timing observation as a Histogram to preserve distribution 
information."""
         if self.metrics_validator.test(stat) and 
name_is_otel_safe(self.prefix, stat):
             if isinstance(dt, datetime.timedelta):
                 dt = dt.total_seconds() * 1000.0
-            self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, 
name=stat), float(dt), False, tags)
+            
self.metrics_map.record_histogram_value(full_name(prefix=self.prefix, 
name=stat), float(dt), tags)
 
     def timer(
         self,
@@ -314,15 +316,29 @@ class InternalGauge:
         self.gauge.set(new_value, attributes=self.attributes)
 
 
+class InternalHistogram:
+    """Stores a histogram instrument for timer/timing metrics."""
+
+    def __init__(self, meter, name: str):
+        otel_safe_name = _get_otel_safe_name(name)
+        self.histogram = meter.create_histogram(name=otel_safe_name, unit="ms")
+        log.debug("Created %s as type: %s", otel_safe_name, 
_type_as_str(self.histogram))
+
+    def record(self, value: float, tags: Attributes) -> None:
+        self.histogram.record(value, attributes=tags)
+
+
 class MetricsMap:
     """Stores Otel Instruments."""
 
     def __init__(self, meter):
         self.meter = meter
         self.map = {}
+        self.histograms: dict[str, InternalHistogram] = {}
 
     def clear(self) -> None:
         self.map.clear()
+        self.histograms.clear()
 
     def _create_counter(self, name):
         """Create a new counter or up_down_counter for the provided name."""
@@ -376,6 +392,21 @@ class MetricsMap:
 
         self.map[key].set_value(value, delta)
 
+    def record_histogram_value(self, name: str, value: float, tags: 
Attributes) -> None:
+        """
+        Record a timing observation in a Histogram instrument.
+
+        Unlike a Gauge, a Histogram accumulates all observations so that 
count, sum,
+        and bucket distribution are preserved across multiple recordings.
+
+        :param name: The name of the histogram to record.
+        :param value: The timing observation in milliseconds.
+        :param tags: Attributes to attach to the observation.
+        """
+        if name not in self.histograms:
+            self.histograms[name] = InternalHistogram(meter=self.meter, 
name=name)
+        self.histograms[name].record(value, tags)
+
 
 def flush_otel_metrics():
     provider = metrics.get_meter_provider()
@@ -400,6 +431,15 @@ def get_otel_logger(
     stat_name_handler: Callable[[str], str] | None = None,
     statsd_influxdb_enabled: bool = False,
 ) -> SafeOtelLogger:
+    """
+    Build and return a :class:`SafeOtelLogger` backed by a configured 
:class:`MeterProvider`.
+
+    Histogram instruments (used for ``timing()`` / ``timer()`` metrics) are 
aggregated with
+    
:class:`~opentelemetry.sdk.metrics.view.ExponentialBucketHistogramAggregation`
+    so that bucket boundaries adapt automatically to the observed data range.  
This avoids
+    the need to hand-tune explicit bucket boundaries for metrics that span 
very different
+    scales (milliseconds to hours).
+    """
     otel_env_config = load_metrics_env_config()
 
     effective_service_name: str = otel_env_config.service_name or service_name 
or "airflow"
@@ -453,6 +493,12 @@ def get_otel_logger(
         MeterProvider(
             resource=resource,
             metric_readers=readers,
+            views=[
+                View(
+                    instrument_type=metrics.Histogram,
+                    aggregation=ExponentialBucketHistogramAggregation(),
+                )
+            ],
             shutdown_on_exit=False,
         ),
     )
diff --git 
a/shared/observability/tests/observability/metrics/test_otel_logger.py 
b/shared/observability/tests/observability/metrics/test_otel_logger.py
index f7b348354d7..3c1369ec44b 100644
--- a/shared/observability/tests/observability/metrics/test_otel_logger.py
+++ b/shared/observability/tests/observability/metrics/test_otel_logger.py
@@ -25,6 +25,7 @@ from unittest import mock
 
 import pytest
 from opentelemetry.metrics import MeterProvider
+from opentelemetry.sdk.metrics.view import 
ExponentialBucketHistogramAggregation, View
 
 from airflow_shared.observability.common import get_otel_data_exporter
 from airflow_shared.observability.exceptions import InvalidStatsNameException
@@ -244,25 +245,28 @@ class TestOtelMetrics:
 
         self.stats.timing(name, dt=datetime.timedelta(seconds=123))
 
-        
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
-        expected_value = 123000.0
-        assert self.map[full_name(name)].value == expected_value
+        
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
 unit="ms")
+        
self.meter.get_meter().create_histogram.return_value.record.assert_called_once_with(
+            123000.0, attributes=None
+        )
 
     def test_timing_new_metric_with_tags(self, name):
         tags = {"hello": "world"}
-        key = _generate_key_name(full_name(name), tags)
 
         self.stats.timing(name, dt=1, tags=tags)
 
-        
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
-        self.map[key].attributes == tags
+        
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
 unit="ms")
+        
self.meter.get_meter().create_histogram.return_value.record.assert_called_once_with(
+            1.0, attributes=tags
+        )
 
     def test_timing_existing_metric(self, name):
         self.stats.timing(name, dt=1)
         self.stats.timing(name, dt=2)
 
-        
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
-        assert self.map[full_name(name)].value == 2
+        # histogram created only once, but both observations are recorded
+        
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
 unit="ms")
+        assert 
self.meter.get_meter().create_histogram.return_value.record.call_count == 2
 
     # For the four test_timer_foo tests below:
     #   time.perf_count() is called once to get the starting timestamp and 
again
@@ -277,7 +281,7 @@ class TestOtelMetrics:
         expected_duration = 3140.0
         assert timer.duration == expected_duration
         assert mock_time.call_count == 2
-        
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+        
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
 unit="ms")
 
     @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
     def test_timer_no_name_returns_float_but_does_not_store_value(self, 
mock_time, name):
@@ -288,7 +292,7 @@ class TestOtelMetrics:
         expected_duration = 3140.0
         assert timer.duration == expected_duration
         assert mock_time.call_count == 2
-        self.meter.get_meter().create_gauge.assert_not_called()
+        self.meter.get_meter().create_histogram.assert_not_called()
 
     @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
     def test_timer_start_and_stop_manually_send_false(self, mock_time, name):
@@ -301,7 +305,7 @@ class TestOtelMetrics:
         expected_value = 3140.0
         assert timer.duration == expected_value
         assert mock_time.call_count == 2
-        self.meter.get_meter().create_gauge.assert_not_called()
+        self.meter.get_meter().create_histogram.assert_not_called()
 
     @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
     def test_timer_start_and_stop_manually_send_true(self, mock_time, name):
@@ -314,7 +318,7 @@ class TestOtelMetrics:
         expected_value = 3140.0
         assert timer.duration == expected_value
         assert mock_time.call_count == 2
-        
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+        
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
 unit="ms")
 
     @pytest.mark.parametrize(
         (
@@ -415,6 +419,18 @@ class TestOtelMetrics:
                 == 
f"opentelemetry.exporter.otlp.proto.{expected_exporter_module}.metric_exporter"
             )
 
+    @mock.patch("airflow_shared.observability.metrics.otel_logger.metrics")
+    
@mock.patch("airflow_shared.observability.metrics.otel_logger.MeterProvider")
+    def test_get_otel_logger_uses_exponential_histogram_view(self, 
mock_provider, mock_metrics):
+        get_otel_logger(host="localhost", port=4318)
+
+        call_kwargs = mock_provider.call_args.kwargs
+        views = call_kwargs["views"]
+        assert len(views) == 1
+        view = views[0]
+        assert isinstance(view, View)
+        assert isinstance(view._aggregation, 
ExponentialBucketHistogramAggregation)
+
     def test_atexit_flush_on_process_exit(self):
         """
         Run a process that initializes a logger, creates a stat and then exits.
@@ -422,8 +438,11 @@ class TestOtelMetrics:
         The logger initialization registers an atexit hook.
         Test that the hook runs and flushes the created stat at shutdown.
         """
-        test_module_name = "tests.observability.metrics.test_otel_logger"
-        function_call_str = f"import {test_module_name} as m; 
m.mock_service_run()"
+        function_call_str = (
+            "from airflow_shared.observability.metrics.otel_logger import 
get_otel_logger; "
+            "logger = get_otel_logger(debug=True); "
+            "logger.incr('my_test_stat')"
+        )
 
         proc = subprocess.run(
             [sys.executable, "-c", function_call_str],
diff --git a/task-sdk/src/airflow/sdk/definitions/deadline.py 
b/task-sdk/src/airflow/sdk/definitions/deadline.py
index 2fe220e789d..7164eeec076 100644
--- a/task-sdk/src/airflow/sdk/definitions/deadline.py
+++ b/task-sdk/src/airflow/sdk/definitions/deadline.py
@@ -145,9 +145,11 @@ class DeadlineAlert:
         reference: DeadlineReferenceType,
         interval: timedelta,
         callback: Callback,
+        name: str | None = None,
     ):
         self.reference = reference
         self.interval = interval
+        self.name = name
 
         if not isinstance(callback, (AsyncCallback, SyncCallback)):
             raise ValueError(f"Callbacks of type {type(callback).__name__} are 
not currently supported")
diff --git a/task-sdk/src/airflow/sdk/definitions/taskgroup.py 
b/task-sdk/src/airflow/sdk/definitions/taskgroup.py
index 97d97148170..25bb3e5dd81 100644
--- a/task-sdk/src/airflow/sdk/definitions/taskgroup.py
+++ b/task-sdk/src/airflow/sdk/definitions/taskgroup.py
@@ -141,6 +141,10 @@ class TaskGroup(DAGNode):
         if not dag:
             raise RuntimeError("TaskGroup can only be used inside a dag")
 
+    def __getitem__(self, key: str) -> DAGNode:
+        """Return a child node by label via ``tg[label]`` syntax. See 
`get_child_by_label`."""
+        return self.get_child_by_label(key)
+
     def __attrs_post_init__(self):
         # TODO: If attrs supported init only args we could use that here
         # https://github.com/python-attrs/attrs/issues/342
diff --git a/task-sdk/tests/task_sdk/definitions/test_taskgroup.py 
b/task-sdk/tests/task_sdk/definitions/test_taskgroup.py
index 73cd272bb9a..2da0b079927 100644
--- a/task-sdk/tests/task_sdk/definitions/test_taskgroup.py
+++ b/task-sdk/tests/task_sdk/definitions/test_taskgroup.py
@@ -223,6 +223,31 @@ def test_build_task_group_with_prefix():
     assert group4.get_child_by_label("task4") == task4
 
 
[email protected](
+    "prefix",
+    [
+        pytest.param(True, id="prefix_on"),
+        pytest.param(False, id="prefix_off"),
+    ],
+)
+def test_taskgroup_getitem_returns_child_by_label(prefix: bool):
+    """Tests that TaskGroup[label] returns the correct child task or 
subgroup."""
+    logical_date = pendulum.datetime(2020, 1, 1)
+    with DAG("test_getitem", start_date=logical_date):
+        with TaskGroup("group1", prefix_group_id=prefix) as group1:
+            task1 = EmptyOperator(task_id="task1")
+            with TaskGroup("subgroup", prefix_group_id=prefix) as subgroup:
+                task2 = EmptyOperator(task_id="task2")
+
+    assert group1["task1"] == task1
+    assert group1["subgroup"] == subgroup
+    assert group1["subgroup"]["task2"] == task2
+
+    # Missing label raises KeyError
+    with pytest.raises(KeyError):
+        group1["nonexistent"]
+
+
 def test_build_task_group_with_prefix_functionality():
     """
     Tests TaskGroup prefix_group_id functionality - additional test for 
comprehensive coverage.

Reply via email to