This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e36fab0095c Create documentation for `allow_consumer_teams` parameter 
in asset access control (#66821)
e36fab0095c is described below

commit e36fab0095c8a803ec6cadfc7fe3520ebca1d61b
Author: Vincent <[email protected]>
AuthorDate: Fri Jun 12 16:16:33 2026 -0400

    Create documentation for `allow_consumer_teams` parameter in asset access 
control (#66821)
---
 .../docs/authoring-and-scheduling/assets.rst       |  18 +-
 airflow-core/docs/core-concepts/multi-team.rst     | 262 ++++++++++++++++++++-
 docs/spelling_wordlist.txt                         |   1 +
 3 files changed, 262 insertions(+), 19 deletions(-)

diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst 
b/airflow-core/docs/authoring-and-scheduling/assets.rst
index 2bc06a30117..994cae815d3 100644
--- a/airflow-core/docs/authoring-and-scheduling/assets.rst
+++ b/airflow-core/docs/authoring-and-scheduling/assets.rst
@@ -404,9 +404,7 @@ As mentioned in :ref:`Fetching information from previously 
emitted asset events<
             events = inlet_events[AssetAlias("example-alias")]
             last_row_count = events[-1].extra["row_count"]
 
-.. _asset_access_control:
-
-Cross-team asset event filtering with ``access_control``
+Cross-team asset event filtering with ``producer_teams``
 --------------------------------------------------------
 
 .. versionadded:: 3.3.0
@@ -441,9 +439,12 @@ The ``AssetAccessControl`` class accepts the following 
parameters:
 
 - **producer_teams** (``list[str]``, default ``[]``): List of team names 
allowed to produce events
   consumed by this asset's consumers, in addition to the consumer's own team.
-- **allow_global** (``bool``, default ``True``): Whether teamless (global) Dag 
producers can trigger
-  consumers of this asset. When set to ``False``, only Dags with an explicit 
team association
-  (same team or listed in ``producer_teams``) can trigger consumers.
+- **consumer_teams** (``list[str] | None``, default ``None``): List of team 
names allowed to consume
+  events produced by this asset's producers. See
+  :ref:`Cross-team asset event filtering with consumer_teams 
<asset_consumer_teams>`.
+- **allow_global** (``bool``, default ``True``): Whether teamless (global) 
Dags can participate in
+  cross-team event delivery. See :doc:`/core-concepts/multi-team` for the full 
semantics on both
+  consumer-side and producer-side assets.
 
 Blocking global producers
 ~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -476,8 +477,9 @@ Default behavior
 ~~~~~~~~~~~~~~~~
 
 When ``access_control`` is not specified, a default ``AssetAccessControl()`` 
is used (empty
-``producer_teams`` and ``allow_global=True``). The rules depend on whether the 
producer and consumer
-have a team association:
+``producer_teams``, ``consumer_teams=None``, and ``allow_global=True``). See
+:doc:`/core-concepts/multi-team` for the complete behavioral rules table. In 
summary, the rules
+depend on whether the producer and consumer have a team association:
 
 - **Both have the same team**: The event is always delivered.
 - **Producer has a team, consumer has a different team**: The event is blocked 
(unless the
diff --git a/airflow-core/docs/core-concepts/multi-team.rst 
b/airflow-core/docs/core-concepts/multi-team.rst
index 0f11b046932..45b6705440f 100644
--- a/airflow-core/docs/core-concepts/multi-team.rst
+++ b/airflow-core/docs/core-concepts/multi-team.rst
@@ -614,9 +614,13 @@ Default Behavior
 
 By default, a consuming Dag only receives asset events from producers within 
the same team or from Dags with no team association, i.e. global Dags.
 
-Cross-Team Opt-In with ``access_control``
+.. _asset_access_control:
+
+Cross-Team Opt-In with ``producer_teams``
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
+.. versionadded:: 3.3.0
+
 To allow specific teams to produce events that trigger consumers on a given 
asset from another team, use the
 ``access_control`` parameter on the ``Asset`` definition with an 
``AssetAccessControl`` instance:
 
@@ -648,8 +652,93 @@ To block global (teamless) Dag producers from triggering 
consumers, set ``allow_
         ),
     )
 
-See :ref:`Cross-team asset event filtering with access_control 
<asset_access_control>` in the Assets
-documentation for usage details and validation rules.
+.. _asset_consumer_teams:
+
+Cross-Team Opt-In with ``consumer_teams``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. versionadded:: 3.3.0
+
+While ``producer_teams`` is specified on the **consumer** side (on the asset 
used in a Dag's schedule),
+``consumer_teams`` is specified on the **producer** side (on the asset used in 
a task's outlets). It
+controls which consumer teams are permitted to receive events produced by that 
specific task.
+
+.. code-block:: python
+
+    from airflow.sdk import DAG, Asset, AssetAccessControl, task
+
+    restricted_output = Asset(
+        name="restricted_output",
+        uri="s3://bucket/restricted/output.csv",
+        access_control=AssetAccessControl(
+            consumer_teams=["team_downstream", "team_reporting"],
+        ),
+    )
+
+    with DAG(dag_id="producer_dag", schedule="@daily"):
+
+        @task(outlets=[restricted_output])
+        def produce_data():
+            """Only team_downstream and team_reporting can consume events from 
this task."""
+
+With this configuration, only consuming Dags belonging to ``team_downstream`` 
or ``team_reporting`` (plus
+teamless consumers) will receive asset events produced by the ``produce_data`` 
task.
+
+Per-producer scoping
+""""""""""""""""""""
+
+``consumer_teams`` is scoped **per producing task**, not per asset. If 
multiple tasks produce events
+for the same asset, each task's ``consumer_teams`` applies independently to 
the events it produces:
+
+.. code-block:: python
+
+    from airflow.sdk import DAG, Asset, AssetAccessControl, task
+
+    # This task restricts consumers to team_a only
+    restricted_asset = Asset(
+        name="shared_asset",
+        uri="s3://bucket/shared.csv",
+        access_control=AssetAccessControl(
+            consumer_teams=["team_a"],
+        ),
+    )
+
+    # This task has no consumer restriction (empty list = all consumers 
allowed)
+    unrestricted_asset = Asset(name="shared_asset", 
uri="s3://bucket/shared.csv")
+
+    with DAG(dag_id="dag_1", schedule="@daily"):
+
+        @task(outlets=[restricted_asset])
+        def task_restricted():
+            """Events from this task only reach team_a consumers."""
+
+
+    with DAG(dag_id="dag_2", schedule="@daily"):
+
+        @task(outlets=[unrestricted_asset])
+        def task_unrestricted():
+            """Events from this task reach all consumers (no restriction)."""
+
+Interaction with ``producer_teams``
+"""""""""""""""""""""""""""""""""""
+
+Both ``producer_teams`` and ``consumer_teams`` are applied as a **logical 
AND**. A consumer Dag
+is queued only if it passes both checks:
+
+- ``producer_teams`` (consumer side): "Which producer teams am I willing to 
accept events from?"
+- ``consumer_teams`` (producer side): "Which consumer teams am I willing to 
deliver events to?"
+
+For example, if a consumer's schedule reference has 
``producer_teams=["team_x"]`` and the producer's
+outlet reference has ``consumer_teams=["team_y"]``, the consumer will only be 
queued if the producer
+belongs to ``team_x`` **and** the consumer belongs to ``team_y``.
+
+Teamless consumer pass-through
+""""""""""""""""""""""""""""""
+
+Teamless consumers (Dags with no team association) pass through the 
``consumer_teams`` list check,
+regardless of its contents. However, they can still be blocked by setting 
``allow_global=False`` on
+the producer-side asset, which prevents teamless consumers from receiving 
events. By default
+(``allow_global=True``), teamless consumers receive events from all producers.
 
 Behavioral Rules
 ^^^^^^^^^^^^^^^^
@@ -658,11 +747,12 @@ The following table describes the complete filtering 
logic:
 
 .. list-table::
    :header-rows: 1
-   :widths: 15 15 18 14 13 25
+   :widths: 12 12 16 16 10 10 24
 
    * - Producer
      - Consumer
      - ``producer_teams``
+     - ``consumer_teams``
      - ``allow_global``
      - Result
      - Reason
@@ -670,86 +760,142 @@ The following table describes the complete filtering 
logic:
      - Team A
      - (any)
      - (any)
+     - (any)
      - ✅ Allowed
      - Same team
    * - Team A (DAG)
      - Team B
      - ``[]``
+     - ``[]``
      - (any)
      - ❌ Blocked
-     - Different team, no opt-in
+     - Different team, no producer opt-in
+   * - Team A (DAG)
+     - Team B
+     - ``["team_a"]``
+     - ``[]``
+     - (any)
+     - ✅ Allowed
+     - Producer opt-in, no consumer restriction
    * - Team A (DAG)
      - Team B
      - ``["team_a"]``
+     - ``["team_b"]``
      - (any)
      - ✅ Allowed
-     - Cross-team opt-in
+     - Both opt-ins satisfied
+   * - Team A (DAG)
+     - Team B
+     - ``["team_a"]``
+     - ``["team_c"]``
+     - (any)
+     - ❌ Blocked
+     - Consumer team not in consumer_teams
+   * - Team A (DAG)
+     - Team B
+     - ``[]``
+     - ``["team_b"]``
+     - (any)
+     - ❌ Blocked
+     - Producer opt-in not satisfied (AND logic)
    * - (no team, DAG)
      - Team B
      - (any)
+     - ``[]``
      - ``True``
      - ✅ Allowed
      - Global producer, allow_global is True
    * - (no team, DAG)
      - Team B
      - (any)
+     - ``[]``
      - ``False``
      - ❌ Blocked
      - Global producer blocked by allow_global=False
+   * - (no team, DAG)
+     - Team B
+     - (any)
+     - ``["team_b"]``
+     - ``True``
+     - ✅ Allowed
+     - Global producer, allow_global is True, consumer in list
+   * - (no team, DAG)
+     - Team B
+     - (any)
+     - ``["team_c"]``
+     - (any)
+     - ❌ Blocked
+     - Global producer, but consumer not in consumer_teams
    * - Team A (DAG)
      - (no team)
      - (any)
      - (any)
+     - (any)
      - ✅ Allowed
-     - Teamless consumer accepts events from any DAG producer
+     - Teamless consumer passes through (unless producer-side 
``allow_global=False``)
    * - (no team, DAG)
      - (no team)
      - (any)
      - (any)
+     - (any)
      - ✅ Allowed
      - Both global
    * - Team A (API)
      - Team A
      - (any)
      - (any)
+     - (any)
      - ✅ Allowed
      - Same team
    * - Team A (API)
      - Team B
      - ``["team_a"]``
+     - ``[]``
      - (any)
      - ✅ Allowed
-     - Cross-team opt-in
+     - Producer opt-in, no consumer restriction
+   * - Team A (API)
+     - Team B
+     - ``["team_a"]``
+     - ``["team_b"]``
+     - (any)
+     - ✅ Allowed
+     - Both opt-ins satisfied
    * - Team A (API)
      - (no team)
      - (any)
      - (any)
+     - (any)
      - ✅ Allowed
-     - Teamless consumer accepts events from any source
+     - Teamless consumer always passes through
    * - (no team, API)
      - Team B
      - (any)
      - (any)
+     - (any)
      - ❌ Blocked
      - Teamless API user cannot trigger team-bound consumer
    * - (no team, API)
      - (no team)
      - (any)
      - (any)
+     - (any)
      - ✅ Allowed
      - Both global
 
 Key rules:
 
 - **Same team**: Always allowed.
-- **Global (teamless) DAG producer with** ``allow_global=True``: Triggers all 
consumers regardless of team.
+- **Global (teamless) DAG producer with** ``allow_global=True``: Triggers all 
consumers regardless of team (unless ``consumer_teams`` restricts them).
 - **Global (teamless) DAG producer with** ``allow_global=False``: Blocked from 
triggering team-bound consumers.
 - **Teamless API user**: Can only trigger teamless consumers. Unlike a 
teamless DAG — which is
   deployed by a platform operator and intentionally shared — an API user 
without a team has no
   verified team affiliation, so their events are restricted to teamless 
consumers to
   prevent unscoped access to team-bound pipelines.
-- **Teamless consumer**: Accepts events from any source (DAG or API), 
regardless of team.
+- **Teamless consumer**: Accepts events from any source (DAG or API), 
regardless of team or ``consumer_teams``, unless ``allow_global=False`` is set 
on the producer-side asset.
 - **Cross-team via** ``producer_teams``: Allowed when the producer's team is 
listed in the asset's ``producer_teams``.
+- **Cross-team via** ``consumer_teams``: Allowed when the consumer's team is 
listed in the producing task's ``consumer_teams``.
+- **Both filters (AND logic)**: When both ``producer_teams`` and 
``consumer_teams`` are specified, a consumer must pass both checks to be queued.
 - **Multi-Team disabled**: All filtering is skipped; existing behavior is 
preserved.
 
 API-Triggered Events
@@ -759,6 +905,16 @@ When a user creates an asset event via the REST API, the 
user's team is resolved
 The same filtering rules apply, with one distinction: a teamless API user can 
only trigger teamless
 consumers, whereas a teamless DAG producer is treated as global and can 
trigger any consumer.
 
+The REST API also accepts an optional ``access_control`` object in the request 
body with the following
+fields:
+
+- ``consumer_teams`` (``list[str] | null``): restricts which consumer teams 
can receive the event,
+  following the same rules as the task-level ``consumer_teams``. When omitted 
or ``null``, no
+  consumer-team filtering is applied.
+- ``allow_global`` (``bool``, default ``true``): whether teamless consumers 
can receive the event.
+
+When Multi-Team mode is disabled, the ``access_control`` parameter is accepted 
but ignored.
+
 Important Considerations
 ------------------------
 
@@ -780,6 +936,90 @@ Global Uniqueness of Identifiers
 
 **Dag IDs, Variable keys, and Connection IDs must be unique across the entire 
Airflow deployment**, regardless of which team owns them. This is similar to 
how S3 bucket names are globally unique across all AWS accounts. You should 
establish naming conventions within your organization to avoid naming conflicts 
(e.g. prefix identifiers with the team name)
 
+Real-World Example: Cross-Team Data Pipeline
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Consider an e-commerce platform with three teams:
+
+- **team_ingestion**: Ingests raw clickstream data from Kafka into S3 every 
hour.
+- **team_analytics**: Builds aggregated reporting tables from that raw data.
+- **team_ml**: Trains recommendation models using the same raw data.
+
+All three teams reference the same asset 
(``s3://data-lake/clickstream/hourly.parquet``), but in
+different roles:
+
+.. code-block:: python
+
+    # --- team_ingestion's Dag bundle ---
+    from airflow.sdk import DAG, Asset, AssetAccessControl, task
+
+    clickstream = Asset(
+        name="clickstream_hourly",
+        uri="s3://data-lake/clickstream/hourly.parquet",
+        access_control=AssetAccessControl(
+            # Allow team_analytics and team_ml to consume events produced by 
team_ingestion
+            consumer_teams=["team_analytics", "team_ml"],
+        ),
+    )
+
+    with DAG(dag_id="ingest_clickstream", schedule="@hourly"):
+
+        @task(outlets=[clickstream])
+        def ingest_from_kafka():
+            """Pull clickstream events from Kafka and write to S3."""
+
+
+.. code-block:: python
+
+    # --- team_analytics's Dag bundle ---
+    from airflow.sdk import DAG, Asset, AssetAccessControl
+
+    clickstream = Asset(
+        name="clickstream_hourly",
+        uri="s3://data-lake/clickstream/hourly.parquet",
+        access_control=AssetAccessControl(
+            # Accept events from team_ingestion (in addition to own-team 
events)
+            producer_teams=["team_ingestion"],
+        ),
+    )
+
+    with DAG(dag_id="build_reporting_tables", schedule=clickstream):
+        ...
+
+
+.. code-block:: python
+
+    # --- team_ml's Dag bundle ---
+    from airflow.sdk import DAG, Asset, AssetAccessControl
+
+    clickstream = Asset(
+        name="clickstream_hourly",
+        uri="s3://data-lake/clickstream/hourly.parquet",
+        access_control=AssetAccessControl(
+            # Accept events from team_ingestion (in addition to own-team 
events)
+            producer_teams=["team_ingestion"],
+        ),
+    )
+
+    with DAG(dag_id="train_recommendations", schedule=clickstream):
+        ...
+
+In this setup:
+
+- The ``clickstream_hourly`` asset is the same global object across all three 
teams.
+- When ``team_ingestion``'s ``ingest_from_kafka`` task completes, it emits an 
asset event.
+- ``team_analytics``'s ``build_reporting_tables`` and ``team_ml``'s 
``train_recommendations`` both
+  receive the event because:
+
+  1. The **consumer side** (``producer_teams=["team_ingestion"]``) opts in to 
accept events from
+     ``team_ingestion``.
+  2. The **producer side** (``consumer_teams=["team_analytics", "team_ml"]``) 
opts in to deliver
+     events to those consumer teams.
+
+- A Dag from an unrelated ``team_marketing`` would **not** receive the event, 
because it is
+  neither listed in ``consumer_teams`` on the producer side nor does it list 
``team_ingestion``
+  in its own ``producer_teams``.
+
 Architecture
 ------------
 
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index b0c7bea4c81..67a7e1fa01b 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -249,6 +249,7 @@ cli
 ClickHouse
 clickhouse
 clickhousedb
+clickstream
 ClientSecretCredential
 cloudant
 CloudantV

Reply via email to