This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bd9270d5441 Fix bulk create+overwrite silently resetting unset fields
on pools and connections (#68645)
bd9270d5441 is described below
commit bd9270d544186d92e4db9f7ee366cb5b194729b3
Author: Sean Ghaeli <[email protected]>
AuthorDate: Wed Jun 17 06:35:36 2026 -0700
Fix bulk create+overwrite silently resetting unset fields on pools and
connections (#68645)
A bulk 'create' with action_on_existence=overwrite dumped the whole request
body
and set every field on the existing record, so fields the request omitted
were
reset to their defaults. For multi-team setups this silently nulled an
existing
pool's or connection's team_name ownership when an overwrite changed only
e.g.
slots; description and include_deferred were affected too.
Only write the fields the request actually provided
(model_dump(exclude_unset=True)),
so omitted fields keep their current value while explicitly-set fields
(even None)
are still applied. Adds regression tests for both pools and connections.
---
.../core_api/services/public/connections.py | 7 ++-
.../api_fastapi/core_api/services/public/pools.py | 9 ++-
.../core_api/routes/public/test_connections.py | 34 +++++++++++
.../core_api/routes/public/test_pools.py | 65 ++++++++++++++++++++++
4 files changed, 113 insertions(+), 2 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
index bc6e5e49f75..74f5d05df77 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
@@ -121,7 +121,12 @@ class BulkConnectionService(BulkService[ConnectionBody]):
if connection.connection_id in create_connection_ids:
if connection.connection_id in matched_connection_ids:
existed_connection =
existed_connections_dict[connection.connection_id]
- for key, val in
connection.model_dump(by_alias=True).items():
+ # Only overwrite fields the request actually provided
(see pools.py for the
+ # full rationale). Plain ``model_dump()`` resets
omitted fields to their
+ # defaults on the existing connection — e.g. silently
nulling ``team_name``
+ # multi-team ownership. ``exclude_unset=True`` writes
only the fields present
+ # in the request body.
+ for key, val in connection.model_dump(by_alias=True,
exclude_unset=True).items():
setattr(existed_connection, key, val)
else:
self.session.add(Connection(**connection.model_dump(by_alias=True)))
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
index dce54099772..476c45b8f5c 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
@@ -149,7 +149,14 @@ class BulkPoolService(BulkService[PoolBody]):
if pool.pool in create_pool_names:
if pool.pool in matched_pool_names:
existed_pool = existing_pools_dict[pool.pool]
- for key, val in pool.model_dump().items():
+ # Only overwrite fields the request actually provided.
Plain ``model_dump()``
+ # emits every field at its default, so an overwrite
that omits e.g.
+ # ``team_name``/``description``/``include_deferred``
silently resets them on the
+ # existing pool — most damagingly nulling its
multi-team ``team_name`` ownership.
+ # ``exclude_unset=True`` writes only fields present in
the request body, so
+ # omitted fields keep their current value while an
explicitly-set field (even
+ # ``None``) is still applied.
+ for key, val in
pool.model_dump(exclude_unset=True).items():
setattr(existed_pool, key, val)
else:
self.session.add(Pool(**pool.model_dump()))
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
index 6930039e1ab..74920b11d51 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
@@ -1984,6 +1984,40 @@ class TestBulkConnections(TestConnectionEndpoint):
expected_error_conn_ids = {err["input"]["connection_id"] for err in
detail}
assert sorted(expected_error_conn_ids) == ["test_conn_id_2",
"test_conn_id_3"]
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_bulk_create_overwrite_preserves_unset_team_name(self,
test_client, testing_team, session):
+ """A bulk create+overwrite that omits ``team_name`` must NOT reset an
existing connection's
+ ``team_name`` to ``None`` (parity with the pools fix). Overwriting
with only ``conn_type``
+ previously clobbered every unset field via
``model_dump(by_alias=True)`` — silently nulling
+ the connection's multi-team ownership. ``exclude_unset=True``
preserves omitted fields.
+ """
+ self.create_connection(team_name=testing_team.name)
+ before = session.scalar(select(Connection).where(Connection.conn_id ==
TEST_CONN_ID))
+ assert before.team_name == testing_team.name
+
+ response = test_client.patch(
+ "/connections",
+ json={
+ "actions": [
+ {
+ "action": "create",
+ "action_on_existence": "overwrite",
+ "entities": [{"connection_id": TEST_CONN_ID,
"conn_type": "new_type"}],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ assert response.json()["create"]["success"] == [TEST_CONN_ID]
+
+ session.expire_all()
+ after = session.scalar(select(Connection).where(Connection.conn_id ==
TEST_CONN_ID))
+ assert after.conn_type == "new_type" # provided field is applied
+ assert after.team_name == testing_team.name, (
+ "bulk overwrite that omitted team_name must preserve existing
ownership, "
+ f"got team_name={after.team_name!r}"
+ )
+
class TestPostConnectionExtraBackwardCompatibility(TestConnectionEndpoint):
def test_post_should_accept_empty_string_as_extra(self, test_client,
session):
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
index 6e17598f07e..a383713fe88 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
@@ -1240,3 +1240,68 @@ class TestBulkPools(TestPoolsEndpoint):
expected_error_names = {err["input"]["name"] for err in detail}
assert sorted(expected_error_names) == ["pool_2", "pool_3"]
+
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_bulk_create_overwrite_preserves_unset_team_name(self,
test_client, session):
+ """A bulk create+overwrite that omits ``team_name`` must NOT reset an
existing pool's
+ ``team_name`` to ``None``.
+
+ ``POOL1_NAME`` is owned by team ``test``. Overwriting it with a body
that changes only
+ ``slots`` (no ``team_name``) previously clobbered every unset field
back to its default via
+ ``model_dump()`` — silently nulling the pool's multi-team ownership.
With
+ ``model_dump(exclude_unset=True)`` the omitted ``team_name`` keeps its
current value.
+ """
+ self.create_pools()
+ before = session.scalar(select(Pool).where(Pool.pool == POOL1_NAME))
+ assert before.team_name == "test"
+
+ response = test_client.patch(
+ "/pools",
+ json={
+ "actions": [
+ {
+ "action": "create",
+ "action_on_existence": "overwrite",
+ "entities": [{"name": POOL1_NAME, "slots": 99}],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ assert response.json()["create"]["success"] == [POOL1_NAME]
+
+ session.expire_all()
+ after = session.scalar(select(Pool).where(Pool.pool == POOL1_NAME))
+ assert after.slots == 99 # the field that WAS provided is applied
+ assert after.team_name == "test", (
+ "bulk overwrite that omitted team_name must preserve existing
ownership, "
+ f"got team_name={after.team_name!r}"
+ )
+
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_bulk_create_overwrite_applies_explicit_team_name(self,
test_client, session):
+ """An explicitly-provided ``team_name`` on a bulk overwrite is still
applied (the fix only
+ skips *omitted* fields, it must not skip fields the request actually
set)."""
+ _create_team()
+ session.add(Team(name="other"))
+ session.commit()
+ _create_pools() # POOL1 owned by team "test"
+
+ response = test_client.patch(
+ "/pools",
+ json={
+ "actions": [
+ {
+ "action": "create",
+ "action_on_existence": "overwrite",
+ "entities": [{"name": POOL1_NAME, "slots": 7,
"team_name": "other"}],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+
+ session.expire_all()
+ after = session.scalar(select(Pool).where(Pool.pool == POOL1_NAME))
+ assert after.team_name == "other"
+ assert after.slots == 7