This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bd9270d5441 Fix bulk create+overwrite silently resetting unset fields 
on pools and connections (#68645)
bd9270d5441 is described below

commit bd9270d544186d92e4db9f7ee366cb5b194729b3
Author: Sean Ghaeli <[email protected]>
AuthorDate: Wed Jun 17 06:35:36 2026 -0700

    Fix bulk create+overwrite silently resetting unset fields on pools and 
connections (#68645)
    
    A bulk 'create' with action_on_existence=overwrite dumped the whole request 
body
    and set every field on the existing record, so fields the request omitted 
were
    reset to their defaults. For multi-team setups this silently nulled an 
existing
    pool's or connection's team_name ownership when an overwrite changed only 
e.g.
    slots; description and include_deferred were affected too.
    
    Only write the fields the request actually provided 
(model_dump(exclude_unset=True)),
    so omitted fields keep their current value while explicitly-set fields 
(even None)
    are still applied. Adds regression tests for both pools and connections.
---
 .../core_api/services/public/connections.py        |  7 ++-
 .../api_fastapi/core_api/services/public/pools.py  |  9 ++-
 .../core_api/routes/public/test_connections.py     | 34 +++++++++++
 .../core_api/routes/public/test_pools.py           | 65 ++++++++++++++++++++++
 4 files changed, 113 insertions(+), 2 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
index bc6e5e49f75..74f5d05df77 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
@@ -121,7 +121,12 @@ class BulkConnectionService(BulkService[ConnectionBody]):
                 if connection.connection_id in create_connection_ids:
                     if connection.connection_id in matched_connection_ids:
                         existed_connection = 
existed_connections_dict[connection.connection_id]
-                        for key, val in 
connection.model_dump(by_alias=True).items():
+                        # Only overwrite fields the request actually provided 
(see pools.py for the
+                        # full rationale). Plain ``model_dump()`` resets 
omitted fields to their
+                        # defaults on the existing connection — e.g. silently 
nulling ``team_name``
+                        # multi-team ownership. ``exclude_unset=True`` writes 
only the fields present
+                        # in the request body.
+                        for key, val in connection.model_dump(by_alias=True, 
exclude_unset=True).items():
                             setattr(existed_connection, key, val)
                     else:
                         
self.session.add(Connection(**connection.model_dump(by_alias=True)))
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
index dce54099772..476c45b8f5c 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
@@ -149,7 +149,14 @@ class BulkPoolService(BulkService[PoolBody]):
                 if pool.pool in create_pool_names:
                     if pool.pool in matched_pool_names:
                         existed_pool = existing_pools_dict[pool.pool]
-                        for key, val in pool.model_dump().items():
+                        # Only overwrite fields the request actually provided. 
Plain ``model_dump()``
+                        # emits every field at its default, so an overwrite 
that omits e.g.
+                        # ``team_name``/``description``/``include_deferred`` 
silently resets them on the
+                        # existing pool — most damagingly nulling its 
multi-team ``team_name`` ownership.
+                        # ``exclude_unset=True`` writes only fields present in 
the request body, so
+                        # omitted fields keep their current value while an 
explicitly-set field (even
+                        # ``None``) is still applied.
+                        for key, val in 
pool.model_dump(exclude_unset=True).items():
                             setattr(existed_pool, key, val)
                     else:
                         self.session.add(Pool(**pool.model_dump()))
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
index 6930039e1ab..74920b11d51 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
@@ -1984,6 +1984,40 @@ class TestBulkConnections(TestConnectionEndpoint):
         expected_error_conn_ids = {err["input"]["connection_id"] for err in 
detail}
         assert sorted(expected_error_conn_ids) == ["test_conn_id_2", 
"test_conn_id_3"]
 
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_bulk_create_overwrite_preserves_unset_team_name(self, 
test_client, testing_team, session):
+        """A bulk create+overwrite that omits ``team_name`` must NOT reset an 
existing connection's
+        ``team_name`` to ``None`` (parity with the pools fix). Overwriting 
with only ``conn_type``
+        previously clobbered every unset field via 
``model_dump(by_alias=True)`` — silently nulling
+        the connection's multi-team ownership. ``exclude_unset=True`` 
preserves omitted fields.
+        """
+        self.create_connection(team_name=testing_team.name)
+        before = session.scalar(select(Connection).where(Connection.conn_id == 
TEST_CONN_ID))
+        assert before.team_name == testing_team.name
+
+        response = test_client.patch(
+            "/connections",
+            json={
+                "actions": [
+                    {
+                        "action": "create",
+                        "action_on_existence": "overwrite",
+                        "entities": [{"connection_id": TEST_CONN_ID, 
"conn_type": "new_type"}],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        assert response.json()["create"]["success"] == [TEST_CONN_ID]
+
+        session.expire_all()
+        after = session.scalar(select(Connection).where(Connection.conn_id == 
TEST_CONN_ID))
+        assert after.conn_type == "new_type"  # provided field is applied
+        assert after.team_name == testing_team.name, (
+            "bulk overwrite that omitted team_name must preserve existing 
ownership, "
+            f"got team_name={after.team_name!r}"
+        )
+
 
 class TestPostConnectionExtraBackwardCompatibility(TestConnectionEndpoint):
     def test_post_should_accept_empty_string_as_extra(self, test_client, 
session):
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
index 6e17598f07e..a383713fe88 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
@@ -1240,3 +1240,68 @@ class TestBulkPools(TestPoolsEndpoint):
 
         expected_error_names = {err["input"]["name"] for err in detail}
         assert sorted(expected_error_names) == ["pool_2", "pool_3"]
+
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_bulk_create_overwrite_preserves_unset_team_name(self, 
test_client, session):
+        """A bulk create+overwrite that omits ``team_name`` must NOT reset an 
existing pool's
+        ``team_name`` to ``None``.
+
+        ``POOL1_NAME`` is owned by team ``test``. Overwriting it with a body 
that changes only
+        ``slots`` (no ``team_name``) previously clobbered every unset field 
back to its default via
+        ``model_dump()`` — silently nulling the pool's multi-team ownership. 
With
+        ``model_dump(exclude_unset=True)`` the omitted ``team_name`` keeps its 
current value.
+        """
+        self.create_pools()
+        before = session.scalar(select(Pool).where(Pool.pool == POOL1_NAME))
+        assert before.team_name == "test"
+
+        response = test_client.patch(
+            "/pools",
+            json={
+                "actions": [
+                    {
+                        "action": "create",
+                        "action_on_existence": "overwrite",
+                        "entities": [{"name": POOL1_NAME, "slots": 99}],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        assert response.json()["create"]["success"] == [POOL1_NAME]
+
+        session.expire_all()
+        after = session.scalar(select(Pool).where(Pool.pool == POOL1_NAME))
+        assert after.slots == 99  # the field that WAS provided is applied
+        assert after.team_name == "test", (
+            "bulk overwrite that omitted team_name must preserve existing 
ownership, "
+            f"got team_name={after.team_name!r}"
+        )
+
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_bulk_create_overwrite_applies_explicit_team_name(self, 
test_client, session):
+        """An explicitly-provided ``team_name`` on a bulk overwrite is still 
applied (the fix only
+        skips *omitted* fields, it must not skip fields the request actually 
set)."""
+        _create_team()
+        session.add(Team(name="other"))
+        session.commit()
+        _create_pools()  # POOL1 owned by team "test"
+
+        response = test_client.patch(
+            "/pools",
+            json={
+                "actions": [
+                    {
+                        "action": "create",
+                        "action_on_existence": "overwrite",
+                        "entities": [{"name": POOL1_NAME, "slots": 7, 
"team_name": "other"}],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+
+        session.expire_all()
+        after = session.scalar(select(Pool).where(Pool.pool == POOL1_NAME))
+        assert after.team_name == "other"
+        assert after.slots == 7

Reply via email to