This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 50fa403ed61 Add e2e test suite for Airflow event-driven DAGs with 
Apache Kafka (#64833)
50fa403ed61 is described below

commit 50fa403ed618d1eec165993dc773567dcf31d673
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Thu May 14 13:32:03 2026 +0800

    Add e2e test suite for Airflow event-driven DAGs with Apache Kafka (#64833)
    
    * Add e2e test suite for Airflow event-driven DAGs with Apache Kafka
    
    * Add 'event_driven' option to E2E test mode in testing_commands.py
    
    * Address review feedback for event-driven E2E tests
    
    - Use timedelta(seconds=1) for retry_delay instead of bare int
    - Return exactly expected_count runs from _wait_for_consumer_dag_runs
    - Replace fixed 30s sleep with polling for Kafka consumer group registration
    - Pin --partitions 1 --replication-factor 1 for deterministic topic creation
    - Add kafka.yml pattern to selective checks file group
    - Remove container_name and unused JMX port from kafka.yml
    - Regenerate breeze CLI doc images
    
    * Fix AwaitTrigger and airflow e2e test
    
    * Add retry for un_pause_dag method
    
    * Add additional requirements for Kafka integration in event-driven setup
    
    * Add pytest output
    
    * fixup: Refactor comments and documentation for consistency in DAG 
terminology and update regex patterns for Kafka provider paths
    
    * fixup: Add entry in ci-amd.yml
    
    * fixup: Remove debug -sv flag
    
    * Apply suggestions from code review
    
    Co-authored-by: Jens Scheffler <[email protected]>
    
    * fixup: mount latest provider changes instead of installing from pypi
    
    ---------
    
    Co-authored-by: Jens Scheffler <[email protected]>
---
 .github/workflows/additional-prod-image-tests.yml  |  16 ++
 .github/workflows/airflow-e2e-tests.yml            |   4 +-
 .github/workflows/ci-amd.yml                       |   2 +
 .github/workflows/ci-arm.yml                       |   2 +
 airflow-e2e-tests/docker/kafka.yml                 |  56 +++++
 airflow-e2e-tests/docker/kafka/update_run.sh       |  27 +++
 .../tests/airflow_e2e_tests/conftest.py            | 105 ++++++++++
 .../tests/airflow_e2e_tests/constants.py           |  15 ++
 .../airflow_e2e_tests/dags/example_event_driven.py | 177 ++++++++++++++++
 .../airflow_e2e_tests/e2e_test_utils/clients.py    |  30 +++
 .../event_driven_tests/__init__.py                 |  16 ++
 .../event_driven_tests/test_event_driven.py        | 229 +++++++++++++++++++++
 .../images/output_testing_airflow-e2e-tests.svg    |   2 +-
 .../images/output_testing_airflow-e2e-tests.txt    |   2 +-
 .../airflow_breeze/commands/testing_commands.py    |   9 +-
 .../src/airflow_breeze/utils/selective_checks.py   |  14 ++
 16 files changed, 701 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/additional-prod-image-tests.yml 
b/.github/workflows/additional-prod-image-tests.yml
index 00fd2edd989..5138c0994ab 100644
--- a/.github/workflows/additional-prod-image-tests.yml
+++ b/.github/workflows/additional-prod-image-tests.yml
@@ -48,6 +48,10 @@ on:  # yamllint disable-line rule:truthy
         description: "Whether to run OpenSearch remote logging e2e tests 
(true/false)"
         required: true
         type: string
+      run-event-driven-e2e-tests:
+        description: "Whether to run event driven e2e tests (true/false)"
+        required: true
+        type: string
       constraints-branch:
         description: "Branch used to construct constraints URL from."
         required: true
@@ -274,6 +278,18 @@ jobs:
       use-uv: ${{ inputs.use-uv }}
       e2e_test_mode: "xcom_object_storage"
 
+  test-e2e-integration-tests-event-driven:
+    name: "Event driven tests with PROD image"
+    uses: ./.github/workflows/airflow-e2e-tests.yml
+    with:
+      workflow-name: "Event driven e2e test"
+      runners: ${{ inputs.runners }}
+      platform: ${{ inputs.platform }}
+      default-python-version: "${{ inputs.default-python-version }}"
+      use-uv: ${{ inputs.use-uv }}
+      e2e_test_mode: "event_driven"
+    if: inputs.canary-run == 'true' || inputs.run-event-driven-e2e-tests == 
'true'
+
   test-ui-e2e-chromium:
     name: "Chromium UI e2e tests with PROD image"
     uses: ./.github/workflows/ui-e2e-tests.yml
diff --git a/.github/workflows/airflow-e2e-tests.yml 
b/.github/workflows/airflow-e2e-tests.yml
index 408529f38a4..e7f1a8c790c 100644
--- a/.github/workflows/airflow-e2e-tests.yml
+++ b/.github/workflows/airflow-e2e-tests.yml
@@ -49,7 +49,7 @@ on:  # yamllint disable-line rule:truthy
         type: string
         required: true
       e2e_test_mode:
-        description: "Test mode - basic, remote_log, remote_log_elasticsearch, 
remote_log_opensearch, or xcom_object_storage"  # yamllint disable-line 
rule:line-length
+        description: "Test mode - basic, remote_log, remote_log_elasticsearch, 
remote_log_opensearch, xcom_object_storage, or event_driven"  # yamllint 
disable-line rule:line-length
         type: string
         default: "basic"
 
@@ -80,7 +80,7 @@ on:  # yamllint disable-line rule:truthy
         type: string
         default: ""
       e2e_test_mode:
-        description: "Test mode - basic, remote_log, remote_log_elasticsearch, 
remote_log_opensearch, or xcom_object_storage"  # yamllint disable-line 
rule:line-length
+        description: "Test mode - basic, remote_log, remote_log_elasticsearch, 
remote_log_opensearch, xcom_object_storage, or event_driven"  # yamllint 
disable-line rule:line-length
         type: string
         default: "basic"
 
diff --git a/.github/workflows/ci-amd.yml b/.github/workflows/ci-amd.yml
index 1e79582747a..47b64293087 100644
--- a/.github/workflows/ci-amd.yml
+++ b/.github/workflows/ci-amd.yml
@@ -132,6 +132,7 @@ jobs:
       run-remote-logging-elasticsearch-e2e-tests: ${{ 
steps.selective-checks.outputs.run-remote-logging-elasticsearch-e2e-tests }}
       run-remote-logging-opensearch-e2e-tests: ${{ 
steps.selective-checks.outputs.run-remote-logging-opensearch-e2e-tests }}
       run-remote-logging-s3-e2e-tests: ${{ 
steps.selective-checks.outputs.run-remote-logging-s3-e2e-tests }}
+      run-event-driven-e2e-tests: ${{ 
steps.selective-checks.outputs.run-event-driven-e2e-tests }}
       run-system-tests: ${{ steps.selective-checks.outputs.run-system-tests }}
       run-task-sdk-tests: ${{ 
steps.selective-checks.outputs.run-task-sdk-tests }}
       run-task-sdk-integration-tests: ${{ 
steps.selective-checks.outputs.run-task-sdk-integration-tests }}
@@ -869,6 +870,7 @@ jobs:
       run-remote-logging-elasticsearch-e2e-tests: ${{ 
needs.build-info.outputs.run-remote-logging-elasticsearch-e2e-tests }}
       run-remote-logging-opensearch-e2e-tests: ${{ 
needs.build-info.outputs.run-remote-logging-opensearch-e2e-tests }}
       run-remote-logging-s3-e2e-tests: ${{ 
needs.build-info.outputs.run-remote-logging-s3-e2e-tests }}
+      run-event-driven-e2e-tests: ${{ 
needs.build-info.outputs.run-event-driven-e2e-tests }}
       use-uv: ${{ needs.build-info.outputs.use-uv }}
       run-ui-e2e-tests: ${{ needs.build-info.outputs.run-ui-e2e-tests }}
       run-airflow-ctl-integration-tests: ${{ 
needs.build-info.outputs.run-airflow-ctl-integration-tests }}
diff --git a/.github/workflows/ci-arm.yml b/.github/workflows/ci-arm.yml
index 2cf141afe7c..62ac6a2d99c 100644
--- a/.github/workflows/ci-arm.yml
+++ b/.github/workflows/ci-arm.yml
@@ -122,6 +122,7 @@ jobs:
       run-remote-logging-elasticsearch-e2e-tests: ${{ 
steps.selective-checks.outputs.run-remote-logging-elasticsearch-e2e-tests }}
       run-remote-logging-opensearch-e2e-tests: ${{ 
steps.selective-checks.outputs.run-remote-logging-opensearch-e2e-tests }}
       run-remote-logging-s3-e2e-tests: ${{ 
steps.selective-checks.outputs.run-remote-logging-s3-e2e-tests }}
+      run-event-driven-e2e-tests: ${{ 
steps.selective-checks.outputs.run-event-driven-e2e-tests }}
       run-system-tests: ${{ steps.selective-checks.outputs.run-system-tests }}
       run-task-sdk-tests: ${{ 
steps.selective-checks.outputs.run-task-sdk-tests }}
       run-task-sdk-integration-tests: ${{ 
steps.selective-checks.outputs.run-task-sdk-integration-tests }}
@@ -859,6 +860,7 @@ jobs:
       run-remote-logging-elasticsearch-e2e-tests: ${{ 
needs.build-info.outputs.run-remote-logging-elasticsearch-e2e-tests }}
       run-remote-logging-opensearch-e2e-tests: ${{ 
needs.build-info.outputs.run-remote-logging-opensearch-e2e-tests }}
       run-remote-logging-s3-e2e-tests: ${{ 
needs.build-info.outputs.run-remote-logging-s3-e2e-tests }}
+      run-event-driven-e2e-tests: ${{ 
needs.build-info.outputs.run-event-driven-e2e-tests }}
       use-uv: ${{ needs.build-info.outputs.use-uv }}
       run-ui-e2e-tests: ${{ needs.build-info.outputs.run-ui-e2e-tests }}
       run-airflow-ctl-integration-tests: ${{ 
needs.build-info.outputs.run-airflow-ctl-integration-tests }}
diff --git a/airflow-e2e-tests/docker/kafka.yml 
b/airflow-e2e-tests/docker/kafka.yml
new file mode 100644
index 00000000000..216f34f66d0
--- /dev/null
+++ b/airflow-e2e-tests/docker/kafka.yml
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+services:
+  broker:
+    image: confluentinc/cp-kafka:7.3.0
+    labels:
+      breeze.description: "Integration required for Kafka hooks."
+    hostname: broker
+    ports:
+      - "9092:9092"
+    environment:
+      KAFKA_BROKER_ID: 1
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >
+        CONTROLLER:PLAINTEXT,
+        PLAINTEXT:PLAINTEXT,
+        PLAINTEXT_HOST:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: >
+        PLAINTEXT://broker:29092,
+        PLAINTEXT_HOST://localhost:9092
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
+      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+      KAFKA_JMX_PORT: 9101
+      KAFKA_JMX_HOSTNAME: localhost
+      KAFKA_PROCESS_ROLES: 'broker,controller'
+      KAFKA_NODE_ID: 1
+      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
+      KAFKA_LISTENERS: >
+        PLAINTEXT://broker:29092,
+        CONTROLLER://broker:29093,
+        PLAINTEXT_HOST://0.0.0.0:9092
+      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
+      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
+      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
+    volumes:
+      - ./kafka/update_run.sh:/tmp/update_run.sh
+    command: >
+      bash -c 'if [ ! -f /tmp/update_run.sh ];
+      then echo "ERROR: update_run.sh not mounted?"
+      && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'
diff --git a/airflow-e2e-tests/docker/kafka/update_run.sh 
b/airflow-e2e-tests/docker/kafka/update_run.sh
new file mode 100755
index 00000000000..d12bdb332b3
--- /dev/null
+++ b/airflow-e2e-tests/docker/kafka/update_run.sh
@@ -0,0 +1,27 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Docker workaround: Remove check for KAFKA_ZOOKEEPER_CONNECT parameter
+sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
+
+# Docker workaround: Ignore cub zk-ready
+sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure
+
+# KRaft required step: Format the storage directory with a new cluster ID
+echo "kafka-storage format --ignore-formatted --cluster-id=$(kafka-storage 
random-uuid) -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
index 4443f7d8a28..a6554606b80 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
@@ -27,6 +27,7 @@ from rich.console import Console
 from testcontainers.compose import DockerCompose
 
 from airflow_e2e_tests.constants import (
+    AIRFLOW_SERVICES_FOR_PROVIDER_MOUNT,
     AWS_INIT_PATH,
     DOCKER_COMPOSE_HOST_PORT,
     DOCKER_COMPOSE_PATH,
@@ -34,9 +35,12 @@ from airflow_e2e_tests.constants import (
     E2E_DAGS_FOLDER,
     E2E_TEST_MODE,
     ELASTICSEARCH_PATH,
+    KAFKA_DIR_PATH,
     LOCALSTACK_PATH,
     LOGS_FOLDER,
     OPENSEARCH_PATH,
+    PROVIDERS_MOUNT_CONTAINER_PATH,
+    PROVIDERS_ROOT_PATH,
     TEST_REPORT_FILE,
     XCOM_BUCKET,
 )
@@ -121,6 +125,94 @@ def _setup_opensearch_integration(dot_env_file, tmp_dir):
     os.environ["ENV_FILE_PATH"] = str(dot_env_file)
 
 
+def _copy_kafka_files(tmp_dir):
+    """Copy the Kafka compose file and broker init script into the temp 
directory."""
+    copyfile(KAFKA_DIR_PATH.parent / "kafka.yml", tmp_dir / "kafka.yml")
+
+    kafka_dir = tmp_dir / "kafka"
+    kafka_dir.mkdir()
+    copyfile(KAFKA_DIR_PATH / "update_run.sh", kafka_dir / "update_run.sh")
+    current_permissions = os.stat(kafka_dir / "update_run.sh").st_mode
+    os.chmod(kafka_dir / "update_run.sh", current_permissions | 0o111)
+
+
+def _write_providers_mount_override(tmp_dir: Path, providers: list[str]) -> 
list[str]:
+    """Write a docker-compose override that bind-mounts in-tree provider 
sources.
+
+    Each entry in ``providers`` is a provider id with dot-separated path 
segments (e.g.
+    ``"apache.kafka"``). The host source ``providers/<dotted/as/slashes>`` is 
mounted
+    read-only into every airflow service at 
``<PROVIDERS_MOUNT_CONTAINER_PATH>/<dashed>``.
+    Returns the list of in-container paths suitable for 
``_PIP_ADDITIONAL_REQUIREMENTS``
+    so pip installs the in-tree (latest, possibly unreleased) provider instead 
of the
+    PyPI release.
+    """
+    in_container_paths: list[str] = []
+    volume_entries: list[str] = []
+    for provider_id in providers:
+        host_path = PROVIDERS_ROOT_PATH / provider_id.replace(".", "/")
+        if not host_path.is_dir():
+            raise RuntimeError(f"Provider source directory not found: 
{host_path}")
+        container_path = 
f"{PROVIDERS_MOUNT_CONTAINER_PATH}/{provider_id.replace('.', '-')}"
+        in_container_paths.append(container_path)
+        volume_entries.append(f"      - {host_path}:{container_path}:ro")
+
+    volumes_block = "\n".join(volume_entries)
+    services_block = "\n".join(
+        f"  {svc}:\n    volumes:\n{volumes_block}" for svc in 
AIRFLOW_SERVICES_FOR_PROVIDER_MOUNT
+    )
+    (tmp_dir / 
"providers-mount.yml").write_text(f"---\nservices:\n{services_block}\n")
+    return in_container_paths
+
+
+def _setup_event_driven_integration(dot_env_file, tmp_dir):
+    _copy_kafka_files(tmp_dir)
+
+    # Install kafka and common-messaging providers from the in-tree sources so 
the
+    # test exercises the latest code even before a PyPI release is cut.
+    provider_paths = _write_providers_mount_override(tmp_dir, ["apache.kafka", 
"common.messaging"])
+
+    kafka_conn = json.dumps(
+        {
+            "conn_type": "kafka",
+            "extra": {
+                "bootstrap.servers": "broker:29092",
+                "group.id": "kafka_default_group",
+                "security.protocol": "PLAINTEXT",
+                "enable.auto.commit": False,
+                "auto.offset.reset": "latest",
+            },
+        }
+    )
+
+    dot_env_file.write_text(
+        f"AIRFLOW_UID={os.getuid()}\n"
+        f"AIRFLOW_CONN_KAFKA_DEFAULT='{kafka_conn}'\n"
+        f"_PIP_ADDITIONAL_REQUIREMENTS={' '.join(provider_paths)}\n"
+    )
+    os.environ["ENV_FILE_PATH"] = str(dot_env_file)
+
+
+def _create_kafka_topics(compose_instance):
+    """Create Kafka topics required by the event-driven Dag."""
+    for topic in ("fizz_buzz", "dlq"):
+        compose_instance.exec_in_container(
+            command=[
+                "kafka-topics",
+                "--bootstrap-server",
+                "broker:29092",
+                "--create",
+                "--topic",
+                topic,
+                "--partitions",
+                "1",
+                "--replication-factor",
+                "1",
+                "--if-not-exists",
+            ],
+            service_name="broker",
+        )
+
+
 def _setup_xcom_object_storage_integration(dot_env_file, tmp_dir):
     _copy_localstack_files(tmp_dir)
 
@@ -180,6 +272,9 @@ def spin_up_airflow_environment(tmp_path_factory: 
pytest.TempPathFactory):
     elif E2E_TEST_MODE == "xcom_object_storage":
         compose_file_names.append("localstack.yml")
         _setup_xcom_object_storage_integration(dot_env_file, tmp_dir)
+    elif E2E_TEST_MODE == "event_driven":
+        compose_file_names.extend(["kafka.yml", "providers-mount.yml"])
+        _setup_event_driven_integration(dot_env_file, tmp_dir)
 
     #
     # Please Do not use this Fernet key in any deployments! Please generate 
your own key.
@@ -204,6 +299,10 @@ def spin_up_airflow_environment(tmp_path_factory: 
pytest.TempPathFactory):
             command=["airflow", "dags", "reserialize"], 
service_name="airflow-dag-processor"
         )
 
+        if E2E_TEST_MODE == "event_driven":
+            console.print("[yellow]Creating Kafka topics...")
+            _create_kafka_topics(_E2ETestState.compose_instance)
+
     except Exception:
         console.print("[red]Failed to start docker compose")
         if _E2ETestState.compose_instance:
@@ -265,6 +364,12 @@ def pytest_sessionfinish(session: pytest.Session, 
exitstatus: int | pytest.ExitC
             _E2ETestState.compose_instance.stop()
 
 
[email protected](scope="session")
+def compose_instance():
+    """Provide access to the running Docker Compose instance."""
+    return _E2ETestState.compose_instance
+
+
 def generate_test_report(results):
     """Generate test report with json summary."""
     report = {
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
index a2d99aeabab..7e27dd6edc6 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
@@ -47,3 +47,18 @@ AWS_INIT_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / 
"scripts" / "init-aws.
 
 # s3 bucket name for XComObjectStorageBackend tests. This bucket will be 
created in the `init-aws.sh` script that is run as part of the LocalStack 
container initialization.
 XCOM_BUCKET = "test-xcom-objectstorage-backend"
+
+KAFKA_DIR_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "kafka"
+
+# Local provider sources are mounted into the airflow containers under this 
directory so
+# ``_PIP_ADDITIONAL_REQUIREMENTS`` can install the in-tree (latest, possibly 
unreleased)
+# provider rather than the published one from PyPI.
+PROVIDERS_ROOT_PATH = AIRFLOW_ROOT_PATH / "providers"
+PROVIDERS_MOUNT_CONTAINER_PATH = "/opt/airflow-providers"
+AIRFLOW_SERVICES_FOR_PROVIDER_MOUNT = (
+    "airflow-apiserver",
+    "airflow-scheduler",
+    "airflow-dag-processor",
+    "airflow-worker",
+    "airflow-triggerer",
+)
diff --git 
a/airflow-e2e-tests/tests/airflow_e2e_tests/dags/example_event_driven.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/dags/example_event_driven.py
new file mode 100644
index 00000000000..f8e61de37f5
--- /dev/null
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/dags/example_event_driven.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+from datetime import timedelta
+from typing import TYPE_CHECKING, cast
+
+import pendulum
+
+from airflow.providers.apache.kafka.operators.produce import 
ProduceToTopicOperator
+from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
+from airflow.sdk import Asset, AssetWatcher, dag, get_current_context, task
+
+if TYPE_CHECKING:
+    from airflow.sdk.execution_time.context import Context, 
TriggeringAssetEventsAccessor
+
+KAFKA_CONFIG_ID = "kafka_default"
+TOPICS = ["fizz_buzz"]
+DLQ_TOPIC = "dlq"
+RETRY_COUNT = 3
+
+# Airflow Kafka connection:
+# AIRFLOW_CONN_KAFKA_DEFAULT='{
+#   "conn_type": "general",
+#   "extra": {
+#     "bootstrap.servers": "broker:29092",
+#     "group.id": "kafka_default_group",
+#     "security.protocol": "PLAINTEXT",
+#     "enable.auto.commit": false,
+#     "auto.offset.reset": "latest"
+#   }
+# }'
+#
+# Kafka commands to verify messages are being produced to the topic:
+#
+# Create Topic
+# /bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz
+# /bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlq
+#
+# Get offsets for the topic to verify messages are being produced
+# /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic fizz_buzz
+# /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic dlq
+#
+# List consumer groups to verify our consumer group is being registered
+# /bin/kafka-consumer-groups --bootstrap-server broker:29092 --list
+#
+# Get current offsets for the consumer group to verify messages are being 
consumed
+# /bin/kafka-consumer-groups --bootstrap-server broker:29092 --describe 
--group group_1
+
+SAMPLE_ORDERS = [
+    {"order_id": "ORD-1001", "customer": "alice", "item": "widget", 
"quantity": 2, "price": 9.99},
+    {"order_id": "ORD-1002", "customer": "bob", "item": "gadget", "quantity": 
1, "price": 24.50},
+    {"order_id": "ORD-1003", "customer": "carol", "item": "widget", 
"quantity": 5, "price": 9.99},
+    {"order_id": "ORD-1004", "customer": "dave", "item": "doohickey", 
"quantity": 3, "price": 14.75},
+    {"order_id": "ORD-1005", "customer": "eve", "item": "thingamajig", 
"quantity": 1, "price": 39.00},
+    {"order_id": "ORD-1006", "customer": "frank", "item": "widget", 
"quantity": 10, "price": 9.99},
+    {"order_id": "ORD-1007", "customer": "grace", "item": "gadget", 
"quantity": 2, "price": 24.50},
+    {"order_id": "ORD-1008", "customer": "heidi", "item": "doohickey", 
"quantity": 1, "price": 14.75},
+]
+
+
+def producer_function():
+    for order in SAMPLE_ORDERS:
+        yield (json.dumps(order["order_id"]), json.dumps(order))
+    # produce a malformed message to demonstrate error handling
+    yield ("malformed_message", "malformed_message")
+
+
+def process_one_message(message: str):
+    order = json.loads(message)
+    total = order["quantity"] * order["price"]
+    print(f"Order {order['order_id']}: {order['quantity']}x {order['item']} = 
${total:.2f}")
+    return order
+
+
+def handle_dlq():
+    context: Context = get_current_context()
+    triggering_asset_events: TriggeringAssetEventsAccessor = 
context["triggering_asset_events"]
+    for event in triggering_asset_events[kafka_cdc_asset]:
+        print(f"Handling failed message from event: {event}")
+        value = json.dumps(
+            {
+                "asset": event.asset.model_dump(mode="json"),
+                "extra": event.extra,
+            }
+        )
+        yield (json.dumps(event.asset.uri), value)
+
+
+# Airflow 3 example
+# Define a trigger that listens to an external message queue (Apache Kafka in 
this case)
+trigger = MessageQueueTrigger(
+    scheme="kafka",
+    # the rest of the parameters are used by the trigger
+    kafka_config_id=KAFKA_CONFIG_ID,
+    topics=TOPICS,
+    poll_interval=1,
+    poll_timeout=1,
+    commit_offset=True,
+)
+
+# Define an asset that watches for messages on the queue
+kafka_cdc_asset = Asset("kafka_cdc_asset", 
watchers=[AssetWatcher(name="kafka_cdc", trigger=trigger)])
+
+
+@dag(
+    schedule=[kafka_cdc_asset],
+    tags=["event-driven"],
+)
+def event_driven_consumer():
+
+    @task(retries=RETRY_COUNT, retry_delay=timedelta(seconds=1))
+    def process_message(**context) -> bool:
+        # Extract the triggering asset events from the context
+        triggering_asset_events: TriggeringAssetEventsAccessor = 
context["triggering_asset_events"]
+        for event in triggering_asset_events[kafka_cdc_asset]:
+            # Get the message from the TriggerEvent payload
+            print(f"Asset event: {event}")
+            process_one_message(cast("str", event.extra["payload"]))
+        return True
+
+    @task.short_circuit(trigger_rule="all_done")
+    def should_handle_dlq(**context) -> bool:
+        """Skip DLQ handling if processing succeeded."""
+        # If process_message succeeded, it pushed True to XCom.
+        # If it failed (exception after retries), no XCom was pushed -> None.
+        upstream_result = context["ti"].xcom_pull(task_ids="process_message")
+        return upstream_result is None
+
+    result = process_message()
+    dlq_check = should_handle_dlq()
+    handle_dlq_task = ProduceToTopicOperator(
+        kafka_config_id=KAFKA_CONFIG_ID,
+        task_id="handle_dlq",
+        topic=DLQ_TOPIC,
+        producer_function=handle_dlq,
+    )
+
+    result >> dlq_check >> handle_dlq_task
+
+
+event_driven_consumer()
+
+
+@dag(
+    description="Load Data to fizz_buzz topic",
+    start_date=pendulum.datetime(2022, 11, 1),
+    schedule=None,
+    catchup=False,
+    tags=["event-driven"],
+)
+def event_driven_producer():
+    ProduceToTopicOperator(
+        kafka_config_id=KAFKA_CONFIG_ID,
+        task_id="produce_to_topic",
+        topic=TOPICS[0],
+        producer_function=producer_function,
+    )
+
+
+event_driven_producer()
diff --git 
a/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
index b0f6573b9dc..ff98eb18e6d 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
@@ -101,7 +101,30 @@ class AirflowClient:
         response.raise_for_status()
         return response.json()
 
+    def get_dag(self, dag_id: str):
+        return self._make_request(method="GET", endpoint=f"dags/{dag_id}")
+
+    def wait_for_dag(self, dag_id: str, timeout: int = 120, check_interval: 
int = 3):
+        """Poll until *dag_id* is registered (parsed & serialized) and 
reachable via the API.
+
+        On remote CI runners the Dag processor may need extra time to parse & 
serialize
+        new Dag files, so calling ``un_pause_dag`` immediately can return a 
404. Retry
+        until the Dag exists or *timeout* seconds elapse.
+        """
+        start = time.monotonic()
+        last_error: Exception | None = None
+        while time.monotonic() - start < timeout:
+            try:
+                return self.get_dag(dag_id)
+            except requests.HTTPError as exc:
+                if exc.response is None or exc.response.status_code != 404:
+                    raise
+                last_error = exc
+            time.sleep(check_interval)
+        raise TimeoutError(f"Dag {dag_id} was not registered within 
{timeout}s. Last error: {last_error}")
+
     def un_pause_dag(self, dag_id: str):
+        self.wait_for_dag(dag_id)
         return self._make_request(
             method="PATCH",
             endpoint=f"dags/{dag_id}",
@@ -154,6 +177,13 @@ class AirflowClient:
             endpoint=f"dags/{dag_id}/dagRuns/{run_id}/taskInstances",
         )
 
+    def list_dag_runs(self, dag_id: str, limit: int = 100):
+        """List Dag runs for a given Dag."""
+        return self._make_request(
+            method="GET",
+            endpoint=f"dags/{dag_id}/dagRuns?limit={limit}",
+        )
+
     def get_task_logs(
         self, dag_id: str, run_id: str, task_id: str, try_number: int = 1, 
map_index: int | None = None
     ):
diff --git 
a/airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/__init__.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/test_event_driven.py
 
b/airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/test_event_driven.py
new file mode 100644
index 00000000000..34fc0092c78
--- /dev/null
+++ 
b/airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/test_event_driven.py
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""E2E tests for the event-driven Dag pattern using Apache Kafka.
+
+The producer Dag sends 8 valid orders + 1 malformed message to the 
``fizz_buzz``
+Kafka topic. The consumer Dag is scheduled on an Asset with a Kafka 
AssetWatcher,
+so each message triggers a separate consumer Dag run (9 total).
+"""
+
+from __future__ import annotations
+
+import time
+
+from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient
+
+PRODUCER_DAG_ID = "event_driven_producer"
+CONSUMER_DAG_ID = "event_driven_consumer"
+# 8 valid orders + 1 malformed message
+EXPECTED_CONSUMER_RUNS = 9
+EXPECTED_FIZZ_BUZZ_OFFSET = 9
+EXPECTED_DLQ_OFFSET = 1
+
+
+def _parse_topic_offset(raw_output: str, topic: str) -> int:
+    """Parse ``kafka-get-offsets`` output (``topic:partition:offset``) and 
return the offset."""
+    for line in raw_output.strip().splitlines():
+        parts = line.strip().split(":")
+        if len(parts) == 3 and parts[0] == topic:
+            return int(parts[2])
+    raise ValueError(f"Could not find offset for topic '{topic}' in 
output:\n{raw_output}")
+
+
+class TestEventDrivenDag:
+    """Test the Kafka-based event-driven producer/consumer Dag pair."""
+
+    airflow_client = AirflowClient()
+
+    def _wait_for_kafka_consumer_group(self, compose_instance, timeout: int = 
60, check_interval: int = 3):
+        """Poll until any Kafka consumer group is registered, indicating the 
trigger is active."""
+        start = time.monotonic()
+        while time.monotonic() - start < timeout:
+            stdout, _, _ = compose_instance.exec_in_container(
+                command=[
+                    "kafka-consumer-groups",
+                    "--bootstrap-server",
+                    "broker:29092",
+                    "--list",
+                ],
+                service_name="broker",
+            )
+            output = stdout.decode() if isinstance(stdout, bytes) else stdout
+            # Any non-empty group listing means the trigger's consumer has 
registered
+            groups = [line.strip() for line in output.strip().splitlines() if 
line.strip()]
+            if groups:
+                return
+            time.sleep(check_interval)
+        raise TimeoutError(f"No Kafka consumer group registered within 
{timeout}s")
+
+    def _wait_for_consumer_dag_runs(
+        self, expected_count: int, timeout: int = 600, check_interval: int = 10
+    ) -> list[dict]:
+        """Poll until *expected_count* consumer Dag runs reach a terminal 
state."""
+        start = time.monotonic()
+        while time.monotonic() - start < timeout:
+            response = self.airflow_client.list_dag_runs(CONSUMER_DAG_ID)
+            runs = response.get("dag_runs", [])
+            terminal_runs = [r for r in runs if r["state"] in ("success", 
"failed")]
+            if len(terminal_runs) >= expected_count:
+                # Return only the most recent expected_count runs to avoid
+                # assertion failures if extra runs exist from retries or prior 
state.
+                terminal_runs.sort(
+                    key=lambda r: (
+                        r.get("end_date") or "",
+                        r.get("start_date") or "",
+                        r.get("logical_date") or "",
+                        r.get("dag_run_id") or "",
+                    ),
+                    reverse=True,
+                )
+                return terminal_runs[:expected_count]
+            time.sleep(check_interval)
+
+        # Timed out — gather diagnostics
+        response = self.airflow_client.list_dag_runs(CONSUMER_DAG_ID)
+        runs = response.get("dag_runs", [])
+        states = {r["dag_run_id"]: r["state"] for r in runs}
+        raise TimeoutError(
+            f"Expected {expected_count} terminal consumer Dag runs within 
{timeout}s, "
+            f"but only found {len([r for r in runs if r['state'] in 
('success', 'failed')])}. "
+            f"Run states: {states}"
+        )
+
+    def _get_topic_offset(self, compose_instance, topic: str) -> int:
+        """Return the current end-offset of *topic* via ``kafka-get-offsets`` 
inside the broker."""
+        stdout, _, _ = compose_instance.exec_in_container(
+            command=[
+                "kafka-get-offsets",
+                "--bootstrap-server",
+                "broker:29092",
+                "--topic",
+                topic,
+            ],
+            service_name="broker",
+        )
+        output = stdout.decode() if isinstance(stdout, bytes) else stdout
+        return _parse_topic_offset(output, topic)
+
+    def _wait_for_topic_offset(
+        self, compose_instance, topic: str, expected_offset: int, timeout: int 
= 30, check_interval: int = 2
+    ) -> int:
+        """Poll until *topic* reaches *expected_offset*, then return the 
offset."""
+        start = time.monotonic()
+        offset = self._get_topic_offset(compose_instance, topic)
+        while offset < expected_offset and time.monotonic() - start < timeout:
+            time.sleep(check_interval)
+            offset = self._get_topic_offset(compose_instance, topic)
+        return offset
+
+    def _get_task_states(self, run_id: str) -> dict[str, str]:
+        """Return a mapping of task_id -> state for a consumer Dag run."""
+        response = self.airflow_client.get_task_instances(CONSUMER_DAG_ID, 
run_id)
+        return {ti["task_id"]: ti["state"] for ti in 
response["task_instances"]}
+
+    def test_producer_triggers_consumer_and_kafka_offsets(self, 
compose_instance):
+        """Trigger the producer once and verify 9 consumer runs and Kafka 
offsets.
+
+        Steps:
+        1. Unpause the consumer Dag so the triggerer starts the AssetWatcher.
+        2. Wait for the Kafka MessageQueueTrigger to begin polling.
+        3. Trigger the producer Dag and wait for it to succeed.
+        4. Wait for 9 consumer Dag runs to reach a terminal state.
+        5. All 9 Dag runs succeed. Verify task-level behavior:
+           - 1 run has a failed ``process_message`` task and executes 
``handle_dlq``.
+           - 8 runs succeed on ``process_message`` and skip ``handle_dlq``.
+        6. Verify that the ``fizz_buzz`` topic has offset 9 (all messages 
produced).
+        7. Verify that the ``dlq`` topic has offset 1 (the malformed message).
+        """
+        # 1. Unpause consumer so the triggerer registers the AssetWatcher
+        self.airflow_client.un_pause_dag(CONSUMER_DAG_ID)
+
+        # 2. Wait for the triggerer to start the MessageQueueTrigger and 
subscribe.
+        #    The trigger uses poll_interval=1 and auto.offset.reset=latest so 
it
+        #    must be actively polling before the producer writes.
+        self._wait_for_kafka_consumer_group(compose_instance)
+
+        # 3. Trigger producer and wait for it to complete
+        producer_state = 
self.airflow_client.trigger_dag_and_wait(PRODUCER_DAG_ID)
+        assert producer_state == "success", f"Producer Dag did not succeed. 
Final state: {producer_state}"
+
+        # 4. Wait for all 9 consumer Dag runs
+        consumer_runs = 
self._wait_for_consumer_dag_runs(expected_count=EXPECTED_CONSUMER_RUNS)
+        assert len(consumer_runs) == EXPECTED_CONSUMER_RUNS, (
+            f"Expected {EXPECTED_CONSUMER_RUNS} consumer runs, got 
{len(consumer_runs)}"
+        )
+
+        # 5. All 9 Dag runs should succeed
+        for run in consumer_runs:
+            assert run["state"] == "success", (
+                f"Expected all consumer runs to succeed, but run 
{run['dag_run_id']} "
+                f"has state '{run['state']}'"
+            )
+
+        # 6. Verify task-level behavior per run:
+        #    - 1 run: process_message fails (malformed message), handle_dlq 
executes
+        #    - 8 runs: process_message succeeds, handle_dlq is skipped
+        dlq_runs = []
+        normal_runs = []
+        for run in consumer_runs:
+            ti_states = self._get_task_states(run["dag_run_id"])
+            if ti_states.get("process_message") == "failed":
+                dlq_runs.append(run["dag_run_id"])
+                assert ti_states.get("should_handle_dlq") == "success", (
+                    f"Run {run['dag_run_id']}: expected 
should_handle_dlq=success, "
+                    f"got '{ti_states.get('should_handle_dlq')}'"
+                )
+                assert ti_states.get("handle_dlq") == "success", (
+                    f"Run {run['dag_run_id']}: expected handle_dlq=success, "
+                    f"got '{ti_states.get('handle_dlq')}'"
+                )
+            else:
+                normal_runs.append(run["dag_run_id"])
+                assert ti_states.get("process_message") == "success", (
+                    f"Run {run['dag_run_id']}: expected 
process_message=success, "
+                    f"got '{ti_states.get('process_message')}'"
+                )
+                assert ti_states.get("should_handle_dlq") == "success", (
+                    f"Run {run['dag_run_id']}: expected 
should_handle_dlq=success, "
+                    f"got '{ti_states.get('should_handle_dlq')}'"
+                )
+                assert ti_states.get("handle_dlq") == "skipped", (
+                    f"Run {run['dag_run_id']}: expected handle_dlq=skipped, "
+                    f"got '{ti_states.get('handle_dlq')}'"
+                )
+
+        assert len(dlq_runs) == 1, (
+            f"Expected exactly 1 run with failed process_message (DLQ path), 
got {len(dlq_runs)}: {dlq_runs}"
+        )
+        assert len(normal_runs) == 8, (
+            f"Expected 8 runs with successful process_message, got 
{len(normal_runs)}: {normal_runs}"
+        )
+
+        # 7. Verify Kafka topic offsets
+        #    The DLQ message is produced by the last consumer run to complete, 
so
+        #    kafka-get-offsets may briefly report a stale offset; poll with a 
short timeout.
+        fizz_buzz_offset = self._wait_for_topic_offset(
+            compose_instance, "fizz_buzz", EXPECTED_FIZZ_BUZZ_OFFSET
+        )
+        assert fizz_buzz_offset == EXPECTED_FIZZ_BUZZ_OFFSET, (
+            f"Expected fizz_buzz offset {EXPECTED_FIZZ_BUZZ_OFFSET}, got 
{fizz_buzz_offset}"
+        )
+
+        dlq_offset = self._wait_for_topic_offset(compose_instance, "dlq", 
EXPECTED_DLQ_OFFSET)
+        assert dlq_offset == EXPECTED_DLQ_OFFSET, (
+            f"Expected dlq offset {EXPECTED_DLQ_OFFSET}, got {dlq_offset}"
+        )
diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg 
b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
index 1add4b96322..b6a293c2ec9 100644
--- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
+++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
@@ -135,7 +135,7 @@
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="312.8" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="312.8" 
textLength="73.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">(TEXT)</text><text 
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="312.8" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">│</text><text 
class="breez [...]
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="337.2" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-13)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="337.2" textLength="366" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-13)">--e2e-test-mode&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;</text><text
 class="breeze-testing-airflow-e2e-tests-r1" x="463.6" y="337.2" textLen [...]
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="361.6" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="361.6" 
textLength="976" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">(basic|remote_log|remote_log_elasticsearch|remote_log_opensearch|xcom_object_sto</text><text
 class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="361.6" 
textLength="12.2" clip-path="u [...]
-</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="386" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="386" textLength="61" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">rage)</text><text 
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="386" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><text 
class="breeze-testing [...]
+</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="386" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="386" 
textLength="219.6" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">rage|event_driven)</text><text
 class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="386" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><text class 
[...]
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="410.4" 
textLength="1464" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯</text><text
 class="breeze-testing-airflow-e2e-tests-r1" x="1464" y="410.4" 
textLength="12.2" clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="434.8" 
textLength="24.4" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">╭─</text><text 
class="breeze-testing-airflow-e2e-tests-r5" x="24.4" y="434.8" 
textLength="195.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">&#160;Common&#160;options&#160;</text><text
 class="breeze-testing-airflow-e2e-tests-r5" x="219.6" y="434.8" 
textLength="1220" clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">─ 
[...]
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="459.2" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-18)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="459.2" 
textLength="109.8" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-18)">--verbose</text><text
 class="breeze-testing-airflow-e2e-tests-r6" x="158.6" y="459.2" 
textLength="24.4" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-18)">-v</text><text 
class="br [...]
diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt 
b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
index 4648ab65601..264b35b577b 100644
--- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
+++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
@@ -1 +1 @@
-0e0a5500aa42f7857d251cf13a1043ed
+19ec59301c48da362cdb7e2bcef8c89f
diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py 
b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
index a29b1471ecf..bb39ae28a42 100644
--- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
@@ -1413,7 +1413,14 @@ option_e2e_test_mode = click.option(
     show_default=True,
     envvar="E2E_TEST_MODE",
     type=click.Choice(
-        ["basic", "remote_log", "remote_log_elasticsearch", 
"remote_log_opensearch", "xcom_object_storage"],
+        [
+            "basic",
+            "remote_log",
+            "remote_log_elasticsearch",
+            "remote_log_opensearch",
+            "xcom_object_storage",
+            "event_driven",
+        ],
         case_sensitive=False,
     ),
 )
diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py 
b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
index 8c055baadfb..5e2a9169990 100644
--- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py
+++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
@@ -123,6 +123,7 @@ class FileGroupForCi(Enum):
     REMOTE_LOGGING_E2E_S3_FILES = auto()
     REMOTE_LOGGING_E2E_ELASTICSEARCH_FILES = auto()
     REMOTE_LOGGING_E2E_OPENSEARCH_FILES = auto()
+    EVENT_DRIVEN_E2E_FILES = auto()
     ALL_PYPROJECT_TOML_FILES = auto()
     ALL_PYTHON_FILES = auto()
     ALL_SOURCE_FILES = auto()
@@ -204,6 +205,14 @@ CI_FILE_GROUP_MATCHES: HashableDict[FileGroupForCi] = 
HashableDict(
             
r"^airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_opensearch_tests/.*",
             r"^providers/opensearch/.*",
         ],
+        FileGroupForCi.EVENT_DRIVEN_E2E_FILES: [
+            
r"^airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/.*",
+            
r"^airflow-e2e-tests/tests/airflow_e2e_tests/dags/example_event_driven\.py$",
+            r"^airflow-e2e-tests/docker/kafka/.*",
+            r"^airflow-e2e-tests/docker/kafka\.yml$",
+            r"^providers/apache/kafka/.*",
+            r"^providers/common/messaging/.*",
+        ],
         FileGroupForCi.PYTHON_PRODUCTION_FILES: [
             r"^airflow-core/src/airflow/.*\.py",
             r"^providers/.*\.py",
@@ -973,6 +982,10 @@ class SelectiveChecks:
             FileGroupForCi.REMOTE_LOGGING_E2E_OPENSEARCH_FILES
         )
 
+    @cached_property
+    def run_event_driven_e2e_tests(self) -> bool:
+        return self._should_be_run(FileGroupForCi.EVENT_DRIVEN_E2E_FILES)
+
     @cached_property
     def run_amazon_tests(self) -> bool:
         if self.providers_test_types_list_as_strings_in_json == "[]":
@@ -1084,6 +1097,7 @@ class SelectiveChecks:
             or self.run_remote_logging_s3_e2e_tests
             or self.run_remote_logging_elasticsearch_e2e_tests
             or self.run_remote_logging_opensearch_e2e_tests
+            or self.run_event_driven_e2e_tests
             or self.run_ui_e2e_tests
         )
 

Reply via email to