This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f72e4c41718 Add `team_name` column to trigger table for multi-team
triggerer support (#67305)
f72e4c41718 is described below
commit f72e4c417187335b3124a5cfe0766df32b5d6aba
Author: Ramit Kataria <[email protected]>
AuthorDate: Mon May 25 08:59:45 2026 -0700
Add `team_name` column to trigger table for multi-team triggerer support
(#67305)
* Add `team_name` column to trigger table for multi-team triggerer support
Adds a nullable `team_name` column (FK -> team.name, ondelete=SET NULL) with
an index to the trigger table. This is the schema foundation for team-scoped
triggerer instances. Follow-up PRs will populate the column at trigger
creation time and add query filtering so each triggerer only processes its
team's triggers.
When a team is deleted, in-flight triggers become global (NULL) and are
picked up by the global triggerer to run to completion.
No behavioral change, this PR only adds the column, migration, and
constructor parameter.
* Add code comment for context about denormalization
---
airflow-core/docs/migrations-ref.rst | 4 +-
.../0116_3_3_0_add_team_name_to_trigger_table.py | 55 ++++++++++++++++++++++
airflow-core/src/airflow/models/trigger.py | 13 ++++-
airflow-core/src/airflow/utils/db.py | 2 +-
airflow-core/tests/unit/models/test_trigger.py | 11 +++++
5 files changed, 82 insertions(+), 3 deletions(-)
diff --git a/airflow-core/docs/migrations-ref.rst
b/airflow-core/docs/migrations-ref.rst
index 42056f2c6cb..82f32c8a2fd 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description
|
+=========================+==================+===================+==============================================================+
-| ``a1b2c3d4e5f6`` (head) | ``a7f3b2c1d4e5`` | ``3.3.0`` | Add
version_data to dag_version. |
+| ``acc215baed80`` (head) | ``a1b2c3d4e5f6`` | ``3.3.0`` | Add
team_name to trigger table. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``a1b2c3d4e5f6`` | ``a7f3b2c1d4e5`` | ``3.3.0`` | Add
version_data to dag_version. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``a7f3b2c1d4e5`` | ``b8f3e4a1d2c9`` | ``3.3.0`` | Add access
control columns to dag_schedule_asset_reference |
| | | | table.
|
diff --git
a/airflow-core/src/airflow/migrations/versions/0116_3_3_0_add_team_name_to_trigger_table.py
b/airflow-core/src/airflow/migrations/versions/0116_3_3_0_add_team_name_to_trigger_table.py
new file mode 100644
index 00000000000..503f1b58d7c
--- /dev/null
+++
b/airflow-core/src/airflow/migrations/versions/0116_3_3_0_add_team_name_to_trigger_table.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add team_name to trigger table.
+
+Revision ID: acc215baed80
+Revises: a1b2c3d4e5f6
+Create Date: 2026-05-21 21:38:00.122692
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "acc215baed80"
+down_revision = "a1b2c3d4e5f6"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+ """Add team_name to trigger table."""
+ with op.batch_alter_table("trigger", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("team_name", sa.String(length=50),
nullable=True))
+ batch_op.create_index(batch_op.f("idx_trigger_team_name"),
["team_name"], unique=False)
+ batch_op.create_foreign_key(
+ batch_op.f("trigger_team_name_fkey"), "team", ["team_name"],
["name"], ondelete="SET NULL"
+ )
+
+
+def downgrade():
+ """Remove team_name from trigger table."""
+ with op.batch_alter_table("trigger", schema=None) as batch_op:
+ batch_op.drop_constraint(batch_op.f("trigger_team_name_fkey"),
type_="foreignkey")
+ batch_op.drop_index(batch_op.f("idx_trigger_team_name"))
+ batch_op.drop_column("team_name")
diff --git a/airflow-core/src/airflow/models/trigger.py
b/airflow-core/src/airflow/models/trigger.py
index a6ee583032a..0d15d5ac497 100644
--- a/airflow-core/src/airflow/models/trigger.py
+++ b/airflow-core/src/airflow/models/trigger.py
@@ -24,7 +24,7 @@ from functools import singledispatch
from traceback import format_exception
from typing import TYPE_CHECKING, Any
-from sqlalchemy import Integer, String, Text, delete, func, or_, select, update
+from sqlalchemy import ForeignKey, Integer, String, Text, delete, func, or_,
select, update
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import Mapped, Session, mapped_column, relationship,
selectinload
from sqlalchemy.sql.functions import coalesce
@@ -100,6 +100,15 @@ class Trigger(Base):
triggerer_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
queue: Mapped[str | None] = mapped_column(String(256), nullable=True)
+ # Denormalized from dag_bundle_team to keep the triggerer's ~1s polling
queries join-free,
+ # especially since it's eventually consistent and trigger rows are
ephemeral.
+ # Without this, filtering by team requires 2-3 joins depending on trigger
type.
+ # Performance testing confirmed the denormalized column avoids measurable
overhead in the
+ # triggerer loop under load.
+ team_name: Mapped[str | None] = mapped_column(
+ String(50), ForeignKey("team.name", ondelete="SET NULL"),
nullable=True, index=True
+ )
+
triggerer_job = relationship(
"Job",
primaryjoin="Job.id == Trigger.triggerer_id",
@@ -122,12 +131,14 @@ class Trigger(Base):
kwargs: dict[str, Any],
created_date: datetime.datetime | None = None,
queue: str | None = None,
+ team_name: str | None = None,
) -> None:
super().__init__()
self.classpath = classpath
self.encrypted_kwargs = self.encrypt_kwargs(kwargs)
self.created_date = created_date or timezone.utcnow()
self.queue = queue
+ self.team_name = team_name
@property
def kwargs(self) -> dict[str, Any]:
diff --git a/airflow-core/src/airflow/utils/db.py
b/airflow-core/src/airflow/utils/db.py
index 9b747d7fc3f..00d512909dc 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -116,7 +116,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
"3.1.0": "cc92b33c6709",
"3.1.8": "509b94a1042d",
"3.2.0": "1d6611b6ab7c",
- "3.3.0": "a1b2c3d4e5f6",
+ "3.3.0": "acc215baed80",
}
# Prefix used to identify tables holding data moved during migration.
diff --git a/airflow-core/tests/unit/models/test_trigger.py
b/airflow-core/tests/unit/models/test_trigger.py
index c79a021f8b8..5ee079f1f0f 100644
--- a/airflow-core/tests/unit/models/test_trigger.py
+++ b/airflow-core/tests/unit/models/test_trigger.py
@@ -83,6 +83,17 @@ def clear_db(session):
session.commit()
+def test_trigger_team_name_stored(session, testing_team):
+ trigger = Trigger(
+ classpath="airflow.triggers.testing.SuccessTrigger", kwargs={},
team_name=testing_team.name
+ )
+ session.add(trigger)
+ session.flush()
+
+ loaded = session.get(Trigger, trigger.id)
+ assert loaded.team_name == "testing"
+
+
def test_fetch_trigger_ids_with_non_task_associations(session):
# Create triggers
asset_trigger =
Trigger(classpath="airflow.triggers.testing.SuccessTrigger1", kwargs={})