This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 50fa403ed61 Add e2e test suite for Airflow event-driven DAGs with
Apache Kafka (#64833)
50fa403ed61 is described below
commit 50fa403ed618d1eec165993dc773567dcf31d673
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Thu May 14 13:32:03 2026 +0800
Add e2e test suite for Airflow event-driven DAGs with Apache Kafka (#64833)
* Add e2e test suite for Airflow event-driven DAGs with Apache Kafka
* Add 'event_driven' option to E2E test mode in testing_commands.py
* Address review feedback for event-driven E2E tests
- Use timedelta(seconds=1) for retry_delay instead of bare int
- Return exactly expected_count runs from _wait_for_consumer_dag_runs
- Replace fixed 30s sleep with polling for Kafka consumer group registration
- Pin --partitions 1 --replication-factor 1 for deterministic topic creation
- Add kafka.yml pattern to selective checks file group
- Remove container_name and unused JMX port from kafka.yml
- Regenerate breeze CLI doc images
* Fix AwaitTrigger and airflow e2e test
* Add retry for un_pause_dag method
* Add additional requirements for Kafka integration in event-driven setup
* Add pytest output
* fixup: Refactor comments and documentation for consistency in DAG
terminology and update regex patterns for Kafka provider paths
* fixup: Add entry in ci-amd.yml
* fixup: Remove debug -sv flag
* Apply suggestions from code review
Co-authored-by: Jens Scheffler <[email protected]>
* fixup: mount latest provider changes instead of installing from pypi
---------
Co-authored-by: Jens Scheffler <[email protected]>
---
.github/workflows/additional-prod-image-tests.yml | 16 ++
.github/workflows/airflow-e2e-tests.yml | 4 +-
.github/workflows/ci-amd.yml | 2 +
.github/workflows/ci-arm.yml | 2 +
airflow-e2e-tests/docker/kafka.yml | 56 +++++
airflow-e2e-tests/docker/kafka/update_run.sh | 27 +++
.../tests/airflow_e2e_tests/conftest.py | 105 ++++++++++
.../tests/airflow_e2e_tests/constants.py | 15 ++
.../airflow_e2e_tests/dags/example_event_driven.py | 177 ++++++++++++++++
.../airflow_e2e_tests/e2e_test_utils/clients.py | 30 +++
.../event_driven_tests/__init__.py | 16 ++
.../event_driven_tests/test_event_driven.py | 229 +++++++++++++++++++++
.../images/output_testing_airflow-e2e-tests.svg | 2 +-
.../images/output_testing_airflow-e2e-tests.txt | 2 +-
.../airflow_breeze/commands/testing_commands.py | 9 +-
.../src/airflow_breeze/utils/selective_checks.py | 14 ++
16 files changed, 701 insertions(+), 5 deletions(-)
diff --git a/.github/workflows/additional-prod-image-tests.yml
b/.github/workflows/additional-prod-image-tests.yml
index 00fd2edd989..5138c0994ab 100644
--- a/.github/workflows/additional-prod-image-tests.yml
+++ b/.github/workflows/additional-prod-image-tests.yml
@@ -48,6 +48,10 @@ on: # yamllint disable-line rule:truthy
description: "Whether to run OpenSearch remote logging e2e tests
(true/false)"
required: true
type: string
+ run-event-driven-e2e-tests:
+ description: "Whether to run event driven e2e tests (true/false)"
+ required: true
+ type: string
constraints-branch:
description: "Branch used to construct constraints URL from."
required: true
@@ -274,6 +278,18 @@ jobs:
use-uv: ${{ inputs.use-uv }}
e2e_test_mode: "xcom_object_storage"
+ test-e2e-integration-tests-event-driven:
+ name: "Event driven tests with PROD image"
+ uses: ./.github/workflows/airflow-e2e-tests.yml
+ with:
+ workflow-name: "Event driven e2e test"
+ runners: ${{ inputs.runners }}
+ platform: ${{ inputs.platform }}
+ default-python-version: "${{ inputs.default-python-version }}"
+ use-uv: ${{ inputs.use-uv }}
+ e2e_test_mode: "event_driven"
+ if: inputs.canary-run == 'true' || inputs.run-event-driven-e2e-tests ==
'true'
+
test-ui-e2e-chromium:
name: "Chromium UI e2e tests with PROD image"
uses: ./.github/workflows/ui-e2e-tests.yml
diff --git a/.github/workflows/airflow-e2e-tests.yml
b/.github/workflows/airflow-e2e-tests.yml
index 408529f38a4..e7f1a8c790c 100644
--- a/.github/workflows/airflow-e2e-tests.yml
+++ b/.github/workflows/airflow-e2e-tests.yml
@@ -49,7 +49,7 @@ on: # yamllint disable-line rule:truthy
type: string
required: true
e2e_test_mode:
- description: "Test mode - basic, remote_log, remote_log_elasticsearch,
remote_log_opensearch, or xcom_object_storage" # yamllint disable-line
rule:line-length
+ description: "Test mode - basic, remote_log, remote_log_elasticsearch,
remote_log_opensearch, xcom_object_storage, or event_driven" # yamllint
disable-line rule:line-length
type: string
default: "basic"
@@ -80,7 +80,7 @@ on: # yamllint disable-line rule:truthy
type: string
default: ""
e2e_test_mode:
- description: "Test mode - basic, remote_log, remote_log_elasticsearch,
remote_log_opensearch, or xcom_object_storage" # yamllint disable-line
rule:line-length
+ description: "Test mode - basic, remote_log, remote_log_elasticsearch,
remote_log_opensearch, xcom_object_storage, or event_driven" # yamllint
disable-line rule:line-length
type: string
default: "basic"
diff --git a/.github/workflows/ci-amd.yml b/.github/workflows/ci-amd.yml
index 1e79582747a..47b64293087 100644
--- a/.github/workflows/ci-amd.yml
+++ b/.github/workflows/ci-amd.yml
@@ -132,6 +132,7 @@ jobs:
run-remote-logging-elasticsearch-e2e-tests: ${{
steps.selective-checks.outputs.run-remote-logging-elasticsearch-e2e-tests }}
run-remote-logging-opensearch-e2e-tests: ${{
steps.selective-checks.outputs.run-remote-logging-opensearch-e2e-tests }}
run-remote-logging-s3-e2e-tests: ${{
steps.selective-checks.outputs.run-remote-logging-s3-e2e-tests }}
+ run-event-driven-e2e-tests: ${{
steps.selective-checks.outputs.run-event-driven-e2e-tests }}
run-system-tests: ${{ steps.selective-checks.outputs.run-system-tests }}
run-task-sdk-tests: ${{
steps.selective-checks.outputs.run-task-sdk-tests }}
run-task-sdk-integration-tests: ${{
steps.selective-checks.outputs.run-task-sdk-integration-tests }}
@@ -869,6 +870,7 @@ jobs:
run-remote-logging-elasticsearch-e2e-tests: ${{
needs.build-info.outputs.run-remote-logging-elasticsearch-e2e-tests }}
run-remote-logging-opensearch-e2e-tests: ${{
needs.build-info.outputs.run-remote-logging-opensearch-e2e-tests }}
run-remote-logging-s3-e2e-tests: ${{
needs.build-info.outputs.run-remote-logging-s3-e2e-tests }}
+ run-event-driven-e2e-tests: ${{
needs.build-info.outputs.run-event-driven-e2e-tests }}
use-uv: ${{ needs.build-info.outputs.use-uv }}
run-ui-e2e-tests: ${{ needs.build-info.outputs.run-ui-e2e-tests }}
run-airflow-ctl-integration-tests: ${{
needs.build-info.outputs.run-airflow-ctl-integration-tests }}
diff --git a/.github/workflows/ci-arm.yml b/.github/workflows/ci-arm.yml
index 2cf141afe7c..62ac6a2d99c 100644
--- a/.github/workflows/ci-arm.yml
+++ b/.github/workflows/ci-arm.yml
@@ -122,6 +122,7 @@ jobs:
run-remote-logging-elasticsearch-e2e-tests: ${{
steps.selective-checks.outputs.run-remote-logging-elasticsearch-e2e-tests }}
run-remote-logging-opensearch-e2e-tests: ${{
steps.selective-checks.outputs.run-remote-logging-opensearch-e2e-tests }}
run-remote-logging-s3-e2e-tests: ${{
steps.selective-checks.outputs.run-remote-logging-s3-e2e-tests }}
+ run-event-driven-e2e-tests: ${{
steps.selective-checks.outputs.run-event-driven-e2e-tests }}
run-system-tests: ${{ steps.selective-checks.outputs.run-system-tests }}
run-task-sdk-tests: ${{
steps.selective-checks.outputs.run-task-sdk-tests }}
run-task-sdk-integration-tests: ${{
steps.selective-checks.outputs.run-task-sdk-integration-tests }}
@@ -859,6 +860,7 @@ jobs:
run-remote-logging-elasticsearch-e2e-tests: ${{
needs.build-info.outputs.run-remote-logging-elasticsearch-e2e-tests }}
run-remote-logging-opensearch-e2e-tests: ${{
needs.build-info.outputs.run-remote-logging-opensearch-e2e-tests }}
run-remote-logging-s3-e2e-tests: ${{
needs.build-info.outputs.run-remote-logging-s3-e2e-tests }}
+ run-event-driven-e2e-tests: ${{
needs.build-info.outputs.run-event-driven-e2e-tests }}
use-uv: ${{ needs.build-info.outputs.use-uv }}
run-ui-e2e-tests: ${{ needs.build-info.outputs.run-ui-e2e-tests }}
run-airflow-ctl-integration-tests: ${{
needs.build-info.outputs.run-airflow-ctl-integration-tests }}
diff --git a/airflow-e2e-tests/docker/kafka.yml
b/airflow-e2e-tests/docker/kafka.yml
new file mode 100644
index 00000000000..216f34f66d0
--- /dev/null
+++ b/airflow-e2e-tests/docker/kafka.yml
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+services:
+ broker:
+ image: confluentinc/cp-kafka:7.3.0
+ labels:
+ breeze.description: "Integration required for Kafka hooks."
+ hostname: broker
+ ports:
+ - "9092:9092"
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >
+ CONTROLLER:PLAINTEXT,
+ PLAINTEXT:PLAINTEXT,
+ PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: >
+ PLAINTEXT://broker:29092,
+ PLAINTEXT_HOST://localhost:9092
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+ KAFKA_JMX_PORT: 9101
+ KAFKA_JMX_HOSTNAME: localhost
+ KAFKA_PROCESS_ROLES: 'broker,controller'
+ KAFKA_NODE_ID: 1
+ KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
+ KAFKA_LISTENERS: >
+ PLAINTEXT://broker:29092,
+ CONTROLLER://broker:29093,
+ PLAINTEXT_HOST://0.0.0.0:9092
+ KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
+ KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
+ KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
+ volumes:
+ - ./kafka/update_run.sh:/tmp/update_run.sh
+ command: >
+ bash -c 'if [ ! -f /tmp/update_run.sh ];
+ then echo "ERROR: update_run.sh not mounted?"
+ && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'
diff --git a/airflow-e2e-tests/docker/kafka/update_run.sh
b/airflow-e2e-tests/docker/kafka/update_run.sh
new file mode 100755
index 00000000000..d12bdb332b3
--- /dev/null
+++ b/airflow-e2e-tests/docker/kafka/update_run.sh
@@ -0,0 +1,27 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Docker workaround: Remove check for KAFKA_ZOOKEEPER_CONNECT parameter
+sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
+
+# Docker workaround: Ignore cub zk-ready
+sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure
+
+# KRaft required step: Format the storage directory with a new cluster ID
+echo "kafka-storage format --ignore-formatted --cluster-id=$(kafka-storage
random-uuid) -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
index 4443f7d8a28..a6554606b80 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
@@ -27,6 +27,7 @@ from rich.console import Console
from testcontainers.compose import DockerCompose
from airflow_e2e_tests.constants import (
+ AIRFLOW_SERVICES_FOR_PROVIDER_MOUNT,
AWS_INIT_PATH,
DOCKER_COMPOSE_HOST_PORT,
DOCKER_COMPOSE_PATH,
@@ -34,9 +35,12 @@ from airflow_e2e_tests.constants import (
E2E_DAGS_FOLDER,
E2E_TEST_MODE,
ELASTICSEARCH_PATH,
+ KAFKA_DIR_PATH,
LOCALSTACK_PATH,
LOGS_FOLDER,
OPENSEARCH_PATH,
+ PROVIDERS_MOUNT_CONTAINER_PATH,
+ PROVIDERS_ROOT_PATH,
TEST_REPORT_FILE,
XCOM_BUCKET,
)
@@ -121,6 +125,94 @@ def _setup_opensearch_integration(dot_env_file, tmp_dir):
os.environ["ENV_FILE_PATH"] = str(dot_env_file)
+def _copy_kafka_files(tmp_dir):
+ """Copy the Kafka compose file and broker init script into the temp
directory."""
+ copyfile(KAFKA_DIR_PATH.parent / "kafka.yml", tmp_dir / "kafka.yml")
+
+ kafka_dir = tmp_dir / "kafka"
+ kafka_dir.mkdir()
+ copyfile(KAFKA_DIR_PATH / "update_run.sh", kafka_dir / "update_run.sh")
+ current_permissions = os.stat(kafka_dir / "update_run.sh").st_mode
+ os.chmod(kafka_dir / "update_run.sh", current_permissions | 0o111)
+
+
+def _write_providers_mount_override(tmp_dir: Path, providers: list[str]) ->
list[str]:
+ """Write a docker-compose override that bind-mounts in-tree provider
sources.
+
+ Each entry in ``providers`` is a provider id with dot-separated path
segments (e.g.
+ ``"apache.kafka"``). The host source ``providers/<dotted/as/slashes>`` is
mounted
+ read-only into every airflow service at
``<PROVIDERS_MOUNT_CONTAINER_PATH>/<dashed>``.
+ Returns the list of in-container paths suitable for
``_PIP_ADDITIONAL_REQUIREMENTS``
+ so pip installs the in-tree (latest, possibly unreleased) provider instead
of the
+ PyPI release.
+ """
+ in_container_paths: list[str] = []
+ volume_entries: list[str] = []
+ for provider_id in providers:
+ host_path = PROVIDERS_ROOT_PATH / provider_id.replace(".", "/")
+ if not host_path.is_dir():
+ raise RuntimeError(f"Provider source directory not found:
{host_path}")
+ container_path =
f"{PROVIDERS_MOUNT_CONTAINER_PATH}/{provider_id.replace('.', '-')}"
+ in_container_paths.append(container_path)
+ volume_entries.append(f" - {host_path}:{container_path}:ro")
+
+ volumes_block = "\n".join(volume_entries)
+ services_block = "\n".join(
+ f" {svc}:\n volumes:\n{volumes_block}" for svc in
AIRFLOW_SERVICES_FOR_PROVIDER_MOUNT
+ )
+ (tmp_dir /
"providers-mount.yml").write_text(f"---\nservices:\n{services_block}\n")
+ return in_container_paths
+
+
+def _setup_event_driven_integration(dot_env_file, tmp_dir):
+ _copy_kafka_files(tmp_dir)
+
+ # Install kafka and common-messaging providers from the in-tree sources so
the
+ # test exercises the latest code even before a PyPI release is cut.
+ provider_paths = _write_providers_mount_override(tmp_dir, ["apache.kafka",
"common.messaging"])
+
+ kafka_conn = json.dumps(
+ {
+ "conn_type": "kafka",
+ "extra": {
+ "bootstrap.servers": "broker:29092",
+ "group.id": "kafka_default_group",
+ "security.protocol": "PLAINTEXT",
+ "enable.auto.commit": False,
+ "auto.offset.reset": "latest",
+ },
+ }
+ )
+
+ dot_env_file.write_text(
+ f"AIRFLOW_UID={os.getuid()}\n"
+ f"AIRFLOW_CONN_KAFKA_DEFAULT='{kafka_conn}'\n"
+ f"_PIP_ADDITIONAL_REQUIREMENTS={' '.join(provider_paths)}\n"
+ )
+ os.environ["ENV_FILE_PATH"] = str(dot_env_file)
+
+
+def _create_kafka_topics(compose_instance):
+ """Create Kafka topics required by the event-driven Dag."""
+ for topic in ("fizz_buzz", "dlq"):
+ compose_instance.exec_in_container(
+ command=[
+ "kafka-topics",
+ "--bootstrap-server",
+ "broker:29092",
+ "--create",
+ "--topic",
+ topic,
+ "--partitions",
+ "1",
+ "--replication-factor",
+ "1",
+ "--if-not-exists",
+ ],
+ service_name="broker",
+ )
+
+
def _setup_xcom_object_storage_integration(dot_env_file, tmp_dir):
_copy_localstack_files(tmp_dir)
@@ -180,6 +272,9 @@ def spin_up_airflow_environment(tmp_path_factory:
pytest.TempPathFactory):
elif E2E_TEST_MODE == "xcom_object_storage":
compose_file_names.append("localstack.yml")
_setup_xcom_object_storage_integration(dot_env_file, tmp_dir)
+ elif E2E_TEST_MODE == "event_driven":
+ compose_file_names.extend(["kafka.yml", "providers-mount.yml"])
+ _setup_event_driven_integration(dot_env_file, tmp_dir)
#
# Please Do not use this Fernet key in any deployments! Please generate
your own key.
@@ -204,6 +299,10 @@ def spin_up_airflow_environment(tmp_path_factory:
pytest.TempPathFactory):
command=["airflow", "dags", "reserialize"],
service_name="airflow-dag-processor"
)
+ if E2E_TEST_MODE == "event_driven":
+ console.print("[yellow]Creating Kafka topics...")
+ _create_kafka_topics(_E2ETestState.compose_instance)
+
except Exception:
console.print("[red]Failed to start docker compose")
if _E2ETestState.compose_instance:
@@ -265,6 +364,12 @@ def pytest_sessionfinish(session: pytest.Session,
exitstatus: int | pytest.ExitC
_E2ETestState.compose_instance.stop()
[email protected](scope="session")
+def compose_instance():
+ """Provide access to the running Docker Compose instance."""
+ return _E2ETestState.compose_instance
+
+
def generate_test_report(results):
"""Generate test report with json summary."""
report = {
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
index a2d99aeabab..7e27dd6edc6 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
@@ -47,3 +47,18 @@ AWS_INIT_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" /
"scripts" / "init-aws.
# s3 bucket name for XComObjectStorageBackend tests. This bucket will be
created in the `init-aws.sh` script that is run as part of the LocalStack
container initialization.
XCOM_BUCKET = "test-xcom-objectstorage-backend"
+
+KAFKA_DIR_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "kafka"
+
+# Local provider sources are mounted into the airflow containers under this
directory so
+# ``_PIP_ADDITIONAL_REQUIREMENTS`` can install the in-tree (latest, possibly
unreleased)
+# provider rather than the published one from PyPI.
+PROVIDERS_ROOT_PATH = AIRFLOW_ROOT_PATH / "providers"
+PROVIDERS_MOUNT_CONTAINER_PATH = "/opt/airflow-providers"
+AIRFLOW_SERVICES_FOR_PROVIDER_MOUNT = (
+ "airflow-apiserver",
+ "airflow-scheduler",
+ "airflow-dag-processor",
+ "airflow-worker",
+ "airflow-triggerer",
+)
diff --git
a/airflow-e2e-tests/tests/airflow_e2e_tests/dags/example_event_driven.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/dags/example_event_driven.py
new file mode 100644
index 00000000000..f8e61de37f5
--- /dev/null
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/dags/example_event_driven.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+from datetime import timedelta
+from typing import TYPE_CHECKING, cast
+
+import pendulum
+
+from airflow.providers.apache.kafka.operators.produce import
ProduceToTopicOperator
+from airflow.providers.common.messaging.triggers.msg_queue import
MessageQueueTrigger
+from airflow.sdk import Asset, AssetWatcher, dag, get_current_context, task
+
+if TYPE_CHECKING:
+ from airflow.sdk.execution_time.context import Context,
TriggeringAssetEventsAccessor
+
+KAFKA_CONFIG_ID = "kafka_default"
+TOPICS = ["fizz_buzz"]
+DLQ_TOPIC = "dlq"
+RETRY_COUNT = 3
+
+# Airflow Kafka connection:
+# AIRFLOW_CONN_KAFKA_DEFAULT='{
+# "conn_type": "general",
+# "extra": {
+# "bootstrap.servers": "broker:29092",
+# "group.id": "kafka_default_group",
+# "security.protocol": "PLAINTEXT",
+# "enable.auto.commit": false,
+# "auto.offset.reset": "latest"
+# }
+# }'
+#
+# Kafka commands to verify messages are being produced to the topic:
+#
+# Create Topic
+# /bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz
+# /bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlq
+#
+# Get offsets for the topic to verify messages are being produced
+# /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic fizz_buzz
+# /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic dlq
+#
+# List consumer groups to verify our consumer group is being registered
+# /bin/kafka-consumer-groups --bootstrap-server broker:29092 --list
+#
+# Get current offsets for the consumer group to verify messages are being
consumed
+# /bin/kafka-consumer-groups --bootstrap-server broker:29092 --describe
--group group_1
+
+SAMPLE_ORDERS = [
+ {"order_id": "ORD-1001", "customer": "alice", "item": "widget",
"quantity": 2, "price": 9.99},
+ {"order_id": "ORD-1002", "customer": "bob", "item": "gadget", "quantity":
1, "price": 24.50},
+ {"order_id": "ORD-1003", "customer": "carol", "item": "widget",
"quantity": 5, "price": 9.99},
+ {"order_id": "ORD-1004", "customer": "dave", "item": "doohickey",
"quantity": 3, "price": 14.75},
+ {"order_id": "ORD-1005", "customer": "eve", "item": "thingamajig",
"quantity": 1, "price": 39.00},
+ {"order_id": "ORD-1006", "customer": "frank", "item": "widget",
"quantity": 10, "price": 9.99},
+ {"order_id": "ORD-1007", "customer": "grace", "item": "gadget",
"quantity": 2, "price": 24.50},
+ {"order_id": "ORD-1008", "customer": "heidi", "item": "doohickey",
"quantity": 1, "price": 14.75},
+]
+
+
+def producer_function():
+ for order in SAMPLE_ORDERS:
+ yield (json.dumps(order["order_id"]), json.dumps(order))
+ # produce a malformed message to demonstrate error handling
+ yield ("malformed_message", "malformed_message")
+
+
+def process_one_message(message: str):
+ order = json.loads(message)
+ total = order["quantity"] * order["price"]
+ print(f"Order {order['order_id']}: {order['quantity']}x {order['item']} =
${total:.2f}")
+ return order
+
+
+def handle_dlq():
+ context: Context = get_current_context()
+ triggering_asset_events: TriggeringAssetEventsAccessor =
context["triggering_asset_events"]
+ for event in triggering_asset_events[kafka_cdc_asset]:
+ print(f"Handling failed message from event: {event}")
+ value = json.dumps(
+ {
+ "asset": event.asset.model_dump(mode="json"),
+ "extra": event.extra,
+ }
+ )
+ yield (json.dumps(event.asset.uri), value)
+
+
+# Airflow 3 example
+# Define a trigger that listens to an external message queue (Apache Kafka in
this case)
+trigger = MessageQueueTrigger(
+ scheme="kafka",
+ # the rest of the parameters are used by the trigger
+ kafka_config_id=KAFKA_CONFIG_ID,
+ topics=TOPICS,
+ poll_interval=1,
+ poll_timeout=1,
+ commit_offset=True,
+)
+
+# Define an asset that watches for messages on the queue
+kafka_cdc_asset = Asset("kafka_cdc_asset",
watchers=[AssetWatcher(name="kafka_cdc", trigger=trigger)])
+
+
+@dag(
+ schedule=[kafka_cdc_asset],
+ tags=["event-driven"],
+)
+def event_driven_consumer():
+
+ @task(retries=RETRY_COUNT, retry_delay=timedelta(seconds=1))
+ def process_message(**context) -> bool:
+ # Extract the triggering asset events from the context
+ triggering_asset_events: TriggeringAssetEventsAccessor =
context["triggering_asset_events"]
+ for event in triggering_asset_events[kafka_cdc_asset]:
+ # Get the message from the TriggerEvent payload
+ print(f"Asset event: {event}")
+ process_one_message(cast("str", event.extra["payload"]))
+ return True
+
+ @task.short_circuit(trigger_rule="all_done")
+ def should_handle_dlq(**context) -> bool:
+ """Skip DLQ handling if processing succeeded."""
+ # If process_message succeeded, it pushed True to XCom.
+ # If it failed (exception after retries), no XCom was pushed -> None.
+ upstream_result = context["ti"].xcom_pull(task_ids="process_message")
+ return upstream_result is None
+
+ result = process_message()
+ dlq_check = should_handle_dlq()
+ handle_dlq_task = ProduceToTopicOperator(
+ kafka_config_id=KAFKA_CONFIG_ID,
+ task_id="handle_dlq",
+ topic=DLQ_TOPIC,
+ producer_function=handle_dlq,
+ )
+
+ result >> dlq_check >> handle_dlq_task
+
+
+event_driven_consumer()
+
+
+@dag(
+ description="Load Data to fizz_buzz topic",
+ start_date=pendulum.datetime(2022, 11, 1),
+ schedule=None,
+ catchup=False,
+ tags=["event-driven"],
+)
+def event_driven_producer():
+ ProduceToTopicOperator(
+ kafka_config_id=KAFKA_CONFIG_ID,
+ task_id="produce_to_topic",
+ topic=TOPICS[0],
+ producer_function=producer_function,
+ )
+
+
+event_driven_producer()
diff --git
a/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
index b0f6573b9dc..ff98eb18e6d 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
@@ -101,7 +101,30 @@ class AirflowClient:
response.raise_for_status()
return response.json()
+ def get_dag(self, dag_id: str):
+ return self._make_request(method="GET", endpoint=f"dags/{dag_id}")
+
+ def wait_for_dag(self, dag_id: str, timeout: int = 120, check_interval:
int = 3):
+ """Poll until *dag_id* is registered (parsed & serialized) and
reachable via the API.
+
+ On remote CI runners the Dag processor may need extra time to parse &
serialize
+ new Dag files, so calling ``un_pause_dag`` immediately can return a
404. Retry
+ until the Dag exists or *timeout* seconds elapse.
+ """
+ start = time.monotonic()
+ last_error: Exception | None = None
+ while time.monotonic() - start < timeout:
+ try:
+ return self.get_dag(dag_id)
+ except requests.HTTPError as exc:
+ if exc.response is None or exc.response.status_code != 404:
+ raise
+ last_error = exc
+ time.sleep(check_interval)
+ raise TimeoutError(f"Dag {dag_id} was not registered within
{timeout}s. Last error: {last_error}")
+
def un_pause_dag(self, dag_id: str):
+ self.wait_for_dag(dag_id)
return self._make_request(
method="PATCH",
endpoint=f"dags/{dag_id}",
@@ -154,6 +177,13 @@ class AirflowClient:
endpoint=f"dags/{dag_id}/dagRuns/{run_id}/taskInstances",
)
+ def list_dag_runs(self, dag_id: str, limit: int = 100):
+ """List Dag runs for a given Dag."""
+ return self._make_request(
+ method="GET",
+ endpoint=f"dags/{dag_id}/dagRuns?limit={limit}",
+ )
+
def get_task_logs(
self, dag_id: str, run_id: str, task_id: str, try_number: int = 1,
map_index: int | None = None
):
diff --git
a/airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/__init__.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/test_event_driven.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/test_event_driven.py
new file mode 100644
index 00000000000..34fc0092c78
--- /dev/null
+++
b/airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/test_event_driven.py
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""E2E tests for the event-driven Dag pattern using Apache Kafka.
+
+The producer Dag sends 8 valid orders + 1 malformed message to the
``fizz_buzz``
+Kafka topic. The consumer Dag is scheduled on an Asset with a Kafka
AssetWatcher,
+so each message triggers a separate consumer Dag run (9 total).
+"""
+
+from __future__ import annotations
+
+import time
+
+from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient
+
+PRODUCER_DAG_ID = "event_driven_producer"
+CONSUMER_DAG_ID = "event_driven_consumer"
+# 8 valid orders + 1 malformed message
+EXPECTED_CONSUMER_RUNS = 9
+EXPECTED_FIZZ_BUZZ_OFFSET = 9
+EXPECTED_DLQ_OFFSET = 1
+
+
+def _parse_topic_offset(raw_output: str, topic: str) -> int:
+ """Parse ``kafka-get-offsets`` output (``topic:partition:offset``) and
return the offset."""
+ for line in raw_output.strip().splitlines():
+ parts = line.strip().split(":")
+ if len(parts) == 3 and parts[0] == topic:
+ return int(parts[2])
+ raise ValueError(f"Could not find offset for topic '{topic}' in
output:\n{raw_output}")
+
+
+class TestEventDrivenDag:
+ """Test the Kafka-based event-driven producer/consumer Dag pair."""
+
+ airflow_client = AirflowClient()
+
+ def _wait_for_kafka_consumer_group(self, compose_instance, timeout: int =
60, check_interval: int = 3):
+ """Poll until any Kafka consumer group is registered, indicating the
trigger is active."""
+ start = time.monotonic()
+ while time.monotonic() - start < timeout:
+ stdout, _, _ = compose_instance.exec_in_container(
+ command=[
+ "kafka-consumer-groups",
+ "--bootstrap-server",
+ "broker:29092",
+ "--list",
+ ],
+ service_name="broker",
+ )
+ output = stdout.decode() if isinstance(stdout, bytes) else stdout
+ # Any non-empty group listing means the trigger's consumer has
registered
+ groups = [line.strip() for line in output.strip().splitlines() if
line.strip()]
+ if groups:
+ return
+ time.sleep(check_interval)
+ raise TimeoutError(f"No Kafka consumer group registered within
{timeout}s")
+
+ def _wait_for_consumer_dag_runs(
+ self, expected_count: int, timeout: int = 600, check_interval: int = 10
+ ) -> list[dict]:
+ """Poll until *expected_count* consumer Dag runs reach a terminal
state."""
+ start = time.monotonic()
+ while time.monotonic() - start < timeout:
+ response = self.airflow_client.list_dag_runs(CONSUMER_DAG_ID)
+ runs = response.get("dag_runs", [])
+ terminal_runs = [r for r in runs if r["state"] in ("success",
"failed")]
+ if len(terminal_runs) >= expected_count:
+ # Return only the most recent expected_count runs to avoid
+ # assertion failures if extra runs exist from retries or prior
state.
+ terminal_runs.sort(
+ key=lambda r: (
+ r.get("end_date") or "",
+ r.get("start_date") or "",
+ r.get("logical_date") or "",
+ r.get("dag_run_id") or "",
+ ),
+ reverse=True,
+ )
+ return terminal_runs[:expected_count]
+ time.sleep(check_interval)
+
+ # Timed out — gather diagnostics
+ response = self.airflow_client.list_dag_runs(CONSUMER_DAG_ID)
+ runs = response.get("dag_runs", [])
+ states = {r["dag_run_id"]: r["state"] for r in runs}
+ raise TimeoutError(
+ f"Expected {expected_count} terminal consumer Dag runs within
{timeout}s, "
+ f"but only found {len([r for r in runs if r['state'] in
('success', 'failed')])}. "
+ f"Run states: {states}"
+ )
+
+ def _get_topic_offset(self, compose_instance, topic: str) -> int:
+ """Return the current end-offset of *topic* via ``kafka-get-offsets``
inside the broker."""
+ stdout, _, _ = compose_instance.exec_in_container(
+ command=[
+ "kafka-get-offsets",
+ "--bootstrap-server",
+ "broker:29092",
+ "--topic",
+ topic,
+ ],
+ service_name="broker",
+ )
+ output = stdout.decode() if isinstance(stdout, bytes) else stdout
+ return _parse_topic_offset(output, topic)
+
+ def _wait_for_topic_offset(
+ self, compose_instance, topic: str, expected_offset: int, timeout: int
= 30, check_interval: int = 2
+ ) -> int:
+ """Poll until *topic* reaches *expected_offset*, then return the
offset."""
+ start = time.monotonic()
+ offset = self._get_topic_offset(compose_instance, topic)
+ while offset < expected_offset and time.monotonic() - start < timeout:
+ time.sleep(check_interval)
+ offset = self._get_topic_offset(compose_instance, topic)
+ return offset
+
+ def _get_task_states(self, run_id: str) -> dict[str, str]:
+ """Return a mapping of task_id -> state for a consumer Dag run."""
+ response = self.airflow_client.get_task_instances(CONSUMER_DAG_ID,
run_id)
+ return {ti["task_id"]: ti["state"] for ti in
response["task_instances"]}
+
+ def test_producer_triggers_consumer_and_kafka_offsets(self,
compose_instance):
+ """Trigger the producer once and verify 9 consumer runs and Kafka
offsets.
+
+ Steps:
+ 1. Unpause the consumer Dag so the triggerer starts the AssetWatcher.
+ 2. Wait for the Kafka MessageQueueTrigger to begin polling.
+ 3. Trigger the producer Dag and wait for it to succeed.
+ 4. Wait for 9 consumer Dag runs to reach a terminal state.
+ 5. All 9 Dag runs succeed. Verify task-level behavior:
+ - 1 run has a failed ``process_message`` task and executes
``handle_dlq``.
+ - 8 runs succeed on ``process_message`` and skip ``handle_dlq``.
+ 6. Verify that the ``fizz_buzz`` topic has offset 9 (all messages
produced).
+ 7. Verify that the ``dlq`` topic has offset 1 (the malformed message).
+ """
+ # 1. Unpause consumer so the triggerer registers the AssetWatcher
+ self.airflow_client.un_pause_dag(CONSUMER_DAG_ID)
+
+ # 2. Wait for the triggerer to start the MessageQueueTrigger and
subscribe.
+ # The trigger uses poll_interval=1 and auto.offset.reset=latest so
it
+ # must be actively polling before the producer writes.
+ self._wait_for_kafka_consumer_group(compose_instance)
+
+ # 3. Trigger producer and wait for it to complete
+ producer_state =
self.airflow_client.trigger_dag_and_wait(PRODUCER_DAG_ID)
+ assert producer_state == "success", f"Producer Dag did not succeed.
Final state: {producer_state}"
+
+ # 4. Wait for all 9 consumer Dag runs
+ consumer_runs =
self._wait_for_consumer_dag_runs(expected_count=EXPECTED_CONSUMER_RUNS)
+ assert len(consumer_runs) == EXPECTED_CONSUMER_RUNS, (
+ f"Expected {EXPECTED_CONSUMER_RUNS} consumer runs, got
{len(consumer_runs)}"
+ )
+
+ # 5. All 9 Dag runs should succeed
+ for run in consumer_runs:
+ assert run["state"] == "success", (
+ f"Expected all consumer runs to succeed, but run
{run['dag_run_id']} "
+ f"has state '{run['state']}'"
+ )
+
+ # 6. Verify task-level behavior per run:
+ # - 1 run: process_message fails (malformed message), handle_dlq
executes
+ # - 8 runs: process_message succeeds, handle_dlq is skipped
+ dlq_runs = []
+ normal_runs = []
+ for run in consumer_runs:
+ ti_states = self._get_task_states(run["dag_run_id"])
+ if ti_states.get("process_message") == "failed":
+ dlq_runs.append(run["dag_run_id"])
+ assert ti_states.get("should_handle_dlq") == "success", (
+ f"Run {run['dag_run_id']}: expected
should_handle_dlq=success, "
+ f"got '{ti_states.get('should_handle_dlq')}'"
+ )
+ assert ti_states.get("handle_dlq") == "success", (
+ f"Run {run['dag_run_id']}: expected handle_dlq=success, "
+ f"got '{ti_states.get('handle_dlq')}'"
+ )
+ else:
+ normal_runs.append(run["dag_run_id"])
+ assert ti_states.get("process_message") == "success", (
+ f"Run {run['dag_run_id']}: expected
process_message=success, "
+ f"got '{ti_states.get('process_message')}'"
+ )
+ assert ti_states.get("should_handle_dlq") == "success", (
+ f"Run {run['dag_run_id']}: expected
should_handle_dlq=success, "
+ f"got '{ti_states.get('should_handle_dlq')}'"
+ )
+ assert ti_states.get("handle_dlq") == "skipped", (
+ f"Run {run['dag_run_id']}: expected handle_dlq=skipped, "
+ f"got '{ti_states.get('handle_dlq')}'"
+ )
+
+ assert len(dlq_runs) == 1, (
+ f"Expected exactly 1 run with failed process_message (DLQ path),
got {len(dlq_runs)}: {dlq_runs}"
+ )
+ assert len(normal_runs) == 8, (
+ f"Expected 8 runs with successful process_message, got
{len(normal_runs)}: {normal_runs}"
+ )
+
+ # 7. Verify Kafka topic offsets
+ # The DLQ message is produced by the last consumer run to complete,
so
+ # kafka-get-offsets may briefly report a stale offset; poll with a
short timeout.
+ fizz_buzz_offset = self._wait_for_topic_offset(
+ compose_instance, "fizz_buzz", EXPECTED_FIZZ_BUZZ_OFFSET
+ )
+ assert fizz_buzz_offset == EXPECTED_FIZZ_BUZZ_OFFSET, (
+ f"Expected fizz_buzz offset {EXPECTED_FIZZ_BUZZ_OFFSET}, got
{fizz_buzz_offset}"
+ )
+
+ dlq_offset = self._wait_for_topic_offset(compose_instance, "dlq",
EXPECTED_DLQ_OFFSET)
+ assert dlq_offset == EXPECTED_DLQ_OFFSET, (
+ f"Expected dlq offset {EXPECTED_DLQ_OFFSET}, got {dlq_offset}"
+ )
diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
index 1add4b96322..b6a293c2ec9 100644
--- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
+++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
@@ -135,7 +135,7 @@
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="312.8"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">│</text><text
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="312.8"
textLength="73.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">(TEXT)</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="312.8"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">│</text><text
class="breez [...]
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="337.2"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-13)">│</text><text
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="337.2" textLength="366"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-13)">--e2e-test-mode               </text><text
class="breeze-testing-airflow-e2e-tests-r1" x="463.6" y="337.2" textLen [...]
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="361.6"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">│</text><text
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="361.6"
textLength="976"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">(basic|remote_log|remote_log_elasticsearch|remote_log_opensearch|xcom_object_sto</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="361.6"
textLength="12.2" clip-path="u [...]
-</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="386"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><text
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="386" textLength="61"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">rage)</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="386"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><text
class="breeze-testing [...]
+</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="386"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><text
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="386"
textLength="219.6"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">rage|event_driven)</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="386"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><text class
[...]
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="410.4"
textLength="1464"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯</text><text
class="breeze-testing-airflow-e2e-tests-r1" x="1464" y="410.4"
textLength="12.2" clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="434.8"
textLength="24.4"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">╭─</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="24.4" y="434.8"
textLength="195.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)"> Common options </text><text
class="breeze-testing-airflow-e2e-tests-r5" x="219.6" y="434.8"
textLength="1220" clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">─
[...]
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="459.2"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-18)">│</text><text
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="459.2"
textLength="109.8"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-18)">--verbose</text><text
class="breeze-testing-airflow-e2e-tests-r6" x="158.6" y="459.2"
textLength="24.4"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-18)">-v</text><text
class="br [...]
diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
index 4648ab65601..264b35b577b 100644
--- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
+++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
@@ -1 +1 @@
-0e0a5500aa42f7857d251cf13a1043ed
+19ec59301c48da362cdb7e2bcef8c89f
diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py
b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
index a29b1471ecf..bb39ae28a42 100644
--- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
@@ -1413,7 +1413,14 @@ option_e2e_test_mode = click.option(
show_default=True,
envvar="E2E_TEST_MODE",
type=click.Choice(
- ["basic", "remote_log", "remote_log_elasticsearch",
"remote_log_opensearch", "xcom_object_storage"],
+ [
+ "basic",
+ "remote_log",
+ "remote_log_elasticsearch",
+ "remote_log_opensearch",
+ "xcom_object_storage",
+ "event_driven",
+ ],
case_sensitive=False,
),
)
diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py
b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
index 8c055baadfb..5e2a9169990 100644
--- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py
+++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
@@ -123,6 +123,7 @@ class FileGroupForCi(Enum):
REMOTE_LOGGING_E2E_S3_FILES = auto()
REMOTE_LOGGING_E2E_ELASTICSEARCH_FILES = auto()
REMOTE_LOGGING_E2E_OPENSEARCH_FILES = auto()
+ EVENT_DRIVEN_E2E_FILES = auto()
ALL_PYPROJECT_TOML_FILES = auto()
ALL_PYTHON_FILES = auto()
ALL_SOURCE_FILES = auto()
@@ -204,6 +205,14 @@ CI_FILE_GROUP_MATCHES: HashableDict[FileGroupForCi] =
HashableDict(
r"^airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_opensearch_tests/.*",
r"^providers/opensearch/.*",
],
+ FileGroupForCi.EVENT_DRIVEN_E2E_FILES: [
+
r"^airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/.*",
+
r"^airflow-e2e-tests/tests/airflow_e2e_tests/dags/example_event_driven\.py$",
+ r"^airflow-e2e-tests/docker/kafka/.*",
+ r"^airflow-e2e-tests/docker/kafka\.yml$",
+ r"^providers/apache/kafka/.*",
+ r"^providers/common/messaging/.*",
+ ],
FileGroupForCi.PYTHON_PRODUCTION_FILES: [
r"^airflow-core/src/airflow/.*\.py",
r"^providers/.*\.py",
@@ -973,6 +982,10 @@ class SelectiveChecks:
FileGroupForCi.REMOTE_LOGGING_E2E_OPENSEARCH_FILES
)
+ @cached_property
+ def run_event_driven_e2e_tests(self) -> bool:
+ return self._should_be_run(FileGroupForCi.EVENT_DRIVEN_E2E_FILES)
+
@cached_property
def run_amazon_tests(self) -> bool:
if self.providers_test_types_list_as_strings_in_json == "[]":
@@ -1084,6 +1097,7 @@ class SelectiveChecks:
or self.run_remote_logging_s3_e2e_tests
or self.run_remote_logging_elasticsearch_e2e_tests
or self.run_remote_logging_opensearch_e2e_tests
+ or self.run_event_driven_e2e_tests
or self.run_ui_e2e_tests
)