This is an automated email from the ASF dual-hosted git repository.
wu-sheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new a144e537ce Add Apache Airflow monitoring layer (SWIP-7) (#13891)
a144e537ce is described below
commit a144e537ce764f96039b53e0d99019289dfc731c
Author: songzhendong <[email protected]>
AuthorDate: Thu Jun 11 07:56:21 2026 +0800
Add Apache Airflow monitoring layer (SWIP-7) (#13891)
---
.github/workflows/skywalking.yaml | 4 +
docs/en/changes/changes.md | 3 +
docs/en/concepts-and-designs/lal.md | 2 +-
docs/en/concepts-and-designs/mal.md | 2 +-
docs/en/concepts-and-designs/service-hierarchy.md | 9 +
.../en/setup/backend/backend-airflow-monitoring.md | 187 +++++++++++
.../horizon-airflow-component-scheduler.png | Bin 0 -> 84507 bytes
.../horizon-airflow-component-triggerer.png | Bin 0 -> 87149 bytes
.../images/airflow/horizon-airflow-service.png | Bin 0 -> 91722 bytes
.../airflow/horizon-infra-3d-map-airflow-dev.png | Bin 0 -> 210097 bytes
.../airflow/horizon-k8s-service-endpoints.png | Bin 0 -> 84772 bytes
.../airflow/horizon-k8s-service-instances.png | Bin 0 -> 100132 bytes
.../images/airflow/horizon-k8s-service-service.png | Bin 0 -> 96356 bytes
.../airflow/horizon-k8s-service-topology.png | Bin 0 -> 194432 bytes
docs/en/swip/SWIP-7.md | 97 ++++++
docs/en/swip/readme.md | 1 +
docs/menu.yml | 4 +
.../airflow/airflow-instance.data.yaml | 298 +++++++++++++++++
.../airflow/airflow-service.data.yaml | 174 ++++++++++
.../skywalking/oap/server/core/analysis/Layer.java | 5 +-
.../src/main/resources/application.yml | 2 +-
.../src/main/resources/hierarchy-definition.yml | 4 +
.../src/main/resources/layer-extensions.yml | 2 +-
.../otel-rules/airflow/airflow-instance.yaml | 53 +++
.../otel-rules/airflow/airflow-service.yaml | 44 +++
test/e2e-v2/cases/airflow/.gitignore | 19 ++
test/e2e-v2/cases/airflow/README.md | 74 +++++
.../cases/airflow/cluster/airflow-cases.yaml | 60 ++++
test/e2e-v2/cases/airflow/cluster/compose-env.sh | 45 +++
.../cases/airflow/cluster/dags/cluster_load.py | 34 ++
.../cases/airflow/cluster/dags/cluster_smoke.py | 31 ++
.../e2e-v2/cases/airflow/cluster/dags/e2e_asset.py | 47 +++
.../cases/airflow/cluster/dags/e2e_deferrable.py | 33 ++
.../cases/airflow/cluster/docker-compose.yml | 290 ++++++++++++++++
test/e2e-v2/cases/airflow/cluster/e2e.yaml | 42 +++
.../cases/airflow/cluster/expected/service.yml | 24 ++
.../airflow/cluster/otel-collector-config.yaml | 41 +++
test/e2e-v2/cases/airflow/cluster/seed-workload.sh | 106 ++++++
test/e2e-v2/cases/airflow/cluster/setup.sh | 26 ++
.../airflow/cluster/wait-scheduler-healthy.sh | 39 +++
.../cases/airflow/mock/Dockerfile.mock-sender | 32 ++
test/e2e-v2/cases/airflow/mock/airflow-cases.yaml | 77 +++++
test/e2e-v2/cases/airflow/mock/docker-compose.yml | 61 ++++
test/e2e-v2/cases/airflow/mock/e2e.yaml | 42 +++
.../cases/airflow/mock/expected/instance.yml | 27 ++
.../expected/metrics-has-value-label-poolname.yml | 34 ++
.../airflow/mock/expected/metrics-has-value.yml | 30 ++
.../e2e-v2/cases/airflow/mock/expected/service.yml | 24 ++
.../mock/mock-data/otel-airflow-metrics.json | 364 +++++++++++++++++++++
.../cases/airflow/mock/otlp_replay_server.py | 106 ++++++
.../cases/airflow/mock/requirements-replay.txt | 4 +
test/e2e-v2/cases/storage/expected/config-dump.yml | 2 +-
52 files changed, 2599 insertions(+), 6 deletions(-)
diff --git a/.github/workflows/skywalking.yaml
b/.github/workflows/skywalking.yaml
index 4b7bab0aaf..504dc19a7f 100644
--- a/.github/workflows/skywalking.yaml
+++ b/.github/workflows/skywalking.yaml
@@ -713,6 +713,10 @@ jobs:
config: test/e2e-v2/cases/kong/e2e.yaml
- name: Flink
config: test/e2e-v2/cases/flink/e2e.yaml
+ - name: Airflow
+ config: test/e2e-v2/cases/airflow/mock/e2e.yaml
+ - name: Airflow Cluster
+ config: test/e2e-v2/cases/airflow/cluster/e2e.yaml
- name: OTLP Trace
config: test/e2e-v2/cases/otlp-traces/e2e.yaml
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index d548d25cd5..a01478649c 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -276,6 +276,7 @@
* Fix: TTL query add metadata TTL.
* Fix: PersistentWorker used wrong TTL for metrics cache if the storage is
BanyanDB.
* Add iOS/iPadOS app monitoring via OpenTelemetry Swift SDK (SWIP-11).
Includes the `IOS` layer, `IOSHTTPSpanListener` for outbound HTTP client
metrics (supports OTel Swift `.old`/`.stable`/`.httpDup` semantic-convention
modes via stable-then-legacy attribute fallback), `IOSMetricKitSpanListener`
for daily MetricKit metrics (exit counts split by foreground/background,
app-launch / hang-time percentile histograms with finite 30 s overflow
ceiling), LAL rules for crash/hang diagnostics, Mo [...]
+* Add Apache Airflow monitoring via native OpenTelemetry metrics (SWIP-7). New
`AIRFLOW` layer with Service (cluster) and Instance (host) dimensions, MAL
rules under `otel-rules/airflow/` (**27** metrics), [setup
documentation](../setup/backend/backend-airflow-monitoring.md), mock OTLP e2e
(`cases/airflow/mock/e2e.yaml`: 2 entity + 27 metric checks, 29 total), and
real Celery-cluster integration smoke (`cases/airflow/cluster/e2e.yaml`: 2
entity + 14 metric checks, 16 total). See `test/e2 [...]
* Fix LAL `layer: auto` mode dropping logs after extractor set the layer.
Codegen now propagates `layer "..."` assignments to `LogMetadata.layer` so
`FilterSpec.doSink()` sees the script-decided layer.
* Fix MetricKit histogram percentile metrics being reported at 1000× their
true value — the listener now marks its `SampleFamily` with
`defaultHistogramBucketUnit(MILLISECONDS)` so MAL's default SECONDS→MS rescale
of `le` labels is not applied.
* Add WeChat and Alipay Mini Program monitoring via the SkyAPM
mini-program-monitor SDK (SWIP-12). Two new layers (`WECHAT_MINI_PROGRAM`,
`ALIPAY_MINI_PROGRAM`); two new JavaScript componentIds (`WeChat-MiniProgram:
10002`, `AliPay-MiniProgram: 10003`). Service / instance / endpoint entities
are produced by MAL + LAL, not trace analysis — mini-programs are client-side
(exit-only) so `RPCAnalysisListener` stays unchanged (same pattern as browser
and iOS). MAL rules per platform × scope un [...]
@@ -294,6 +295,7 @@
* Fix: continuous profiling policy validation now rejects a threshold / count
of `0` to match the error messages and rover's `value >= threshold` trigger
semantics (a `0` threshold would always trigger). CPU percent and HTTP error
rate are tightened from `[0-100]` to `(0-100]`.
#### UI
+* Add Airflow layer dashboards and menu i18n under Workflow Scheduler in
Horizon UI (SWIP-7).
* Add mobile menu icon and i18n labels for the iOS layer.
* Fix metric label rendering in multi-expression dashboard widgets.
* Add i18n menu labels for WeChat Mini Program and Alipay Mini Program (en /
zh / es) — sub-menus rendered as raw keys until this bump.
@@ -301,6 +303,7 @@
#### Documentation
* Update LAL documentation with `sourceAttribute()` function and `layer: auto`
mode.
+* Add Airflow monitoring setup documentation (SWIP-7).
* Add iOS app monitoring setup documentation.
* Add WeChat / Alipay Mini Program monitoring setup documentation, plus a
client-side-monitoring section in the security guide covering public-internet
ingress (OTLP + `/v3/segments`) for mobile / browser / mini-program SDKs.
* Improve downsampling documentation
diff --git a/docs/en/concepts-and-designs/lal.md
b/docs/en/concepts-and-designs/lal.md
index 5a44925177..74acc3f7e3 100644
--- a/docs/en/concepts-and-designs/lal.md
+++ b/docs/en/concepts-and-designs/lal.md
@@ -62,7 +62,7 @@ Notes:
multiple LAL files (and additionally in a MAL file, in
`layer-extensions.yml`, or via the
`LayerExtension` SPI). Conflicting registrations cause OAP boot to fail
loudly with the
offending file in the stack trace.
-- **Ordinals 0–49** are in active use by the OAP distribution's built-in
layers; **50–999**
+- **Ordinals 0–50** are in active use by the OAP distribution's built-in
layers; **51–999**
are reserved by convention for future built-ins. External layers should
start at `>= 1000`
— enforcement is not strict, but staying above the reserved band avoids
upgrade-time
collisions.
diff --git a/docs/en/concepts-and-designs/mal.md
b/docs/en/concepts-and-designs/mal.md
index 85373db6dd..5d8c92be2e 100644
--- a/docs/en/concepts-and-designs/mal.md
+++ b/docs/en/concepts-and-designs/mal.md
@@ -405,7 +405,7 @@ Notes:
`LayerExtension` SPI). Conflicting registrations (same name with different
ordinal, or same
ordinal with different name) cause OAP boot to fail loudly with the
offending file in the
stack trace.
-- **Ordinals 0–49** are in active use by the OAP distribution's built-in
layers; **50–999** are
+- **Ordinals 0–50** are in active use by the OAP distribution's built-in
layers; **51–999** are
reserved by convention for future built-ins. External layers should start at
`>= 1000` —
enforcement is not strict, but staying above the reserved band avoids
upgrade-time collisions.
diff --git a/docs/en/concepts-and-designs/service-hierarchy.md
b/docs/en/concepts-and-designs/service-hierarchy.md
index 5f3c5144fc..12b2918d54 100644
--- a/docs/en/concepts-and-designs/service-hierarchy.md
+++ b/docs/en/concepts-and-designs/service-hierarchy.md
@@ -38,6 +38,7 @@ If you want to customize it according to your own needs,
please refer to [Servic
| PULSAR | K8S_SERVICE | [PULSAR On
K8S_SERVICE](#pulsar-on-k8s_service) |
| SO11Y_OAP | K8S_SERVICE | [SO11Y_OAP On
K8S_SERVICE](#so11y_oap-on-k8s_service) |
| KONG | K8S_SERVICE | [KONG On K8S_SERVICE](#kong-on-k8s_service)
|
+| AIRFLOW | K8S_SERVICE | [AIRFLOW On
K8S_SERVICE](#airflow-on-k8s_service) |
- The following sections will describe the **default matching rules** in
detail and use the `upper-layer On lower-layer` format.
- The example service name are based on SkyWalking
[Showcase](https://github.com/apache/skywalking-showcase) default deployment.
@@ -229,6 +230,14 @@ If you want to customize it according to your own needs,
please refer to [Servic
- KONG.service.name: `kong::kong.skywalking-showcase`
- K8S_SERVICE.service.name: `skywalking-showcase::kong.skywalking-showcase`
+#### AIRFLOW On K8S_SERVICE
+- Rule name: `short-name`
+- Matching expression: `{ (u, l) -> u.shortName == l.shortName }`
+- Description: AIRFLOW.service.shortName == K8S_SERVICE.service.shortName
+- Matched Example:
+ - AIRFLOW.service.name: `airflow::airflow.skywalking-showcase`
+ - K8S_SERVICE.service.name:
`skywalking-showcase::airflow.skywalking-showcase`
+
### Build Through Specific Agents
Use agent tech involved(such as eBPF) and deployment tools(such as operator
and agent injector) to detect the service hierarchy relations.
diff --git a/docs/en/setup/backend/backend-airflow-monitoring.md
b/docs/en/setup/backend/backend-airflow-monitoring.md
new file mode 100644
index 0000000000..ab157d1502
--- /dev/null
+++ b/docs/en/setup/backend/backend-airflow-monitoring.md
@@ -0,0 +1,187 @@
+# Airflow monitoring
+
+SkyWalking ingests **Apache Airflow 3.x** metrics from Airflow's native
OpenTelemetry exporter via
+the [OpenTelemetry receiver](opentelemetry-receiver.md), aggregates them with
+[MAL](../../concepts-and-designs/mal.md), and shows them in Horizon UI under
**Workflow Scheduler →
+Airflow**.
+
+## Data flow
+
+1. Airflow exports metrics via native OpenTelemetry (`otel_on` /
`OTEL_EXPORTER_OTLP_*`) from
+ **scheduler** and **triggerer**.
+2. OpenTelemetry Collector receives OTLP metrics and forwards them to
SkyWalking OAP.
+3. OAP aggregates metrics with [MAL](../../concepts-and-designs/mal.md).
+4. Horizon UI displays them under **Workflow Scheduler → Airflow**.
+
+## Setup
+
+### 1. Enable Airflow OpenTelemetry metrics
+
+Install the OTel extra and point export at your Collector. See
+[Airflow metrics
documentation](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/metrics.html).
+
+```bash
+pip install 'apache-airflow[otel]'
+
+export OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318
+export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
+export OTEL_RESOURCE_ATTRIBUTES=cluster=prod-airflow
+```
+
+Set `AIRFLOW__METRICS__OTEL_ON=True` on **scheduler** and **triggerer**.
+
+Required OTLP resource attributes:
+
+| Attribute | Purpose |
+|-----------|---------|
+| `cluster` | Names the Service (`airflow::{cluster}`) |
+| `host.name` | Identifies scheduler or triggerer (UI **Components** tab) |
+
+### 2. OpenTelemetry Collector
+
+Forward OTLP metrics to OAP. Example pipeline:
+
+```yaml
+receivers:
+ otlp:
+ protocols:
+ http:
+ endpoint: 0.0.0.0:4318
+ grpc:
+ endpoint: 0.0.0.0:4317
+
+processors:
+ batch:
+
+exporters:
+ otlp:
+ endpoint: oap:11800
+ tls:
+ insecure: true
+
+service:
+ pipelines:
+ metrics:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [otlp]
+```
+
+Full example:
[`cluster/otel-collector-config.yaml`](../../../../test/e2e-v2/cases/airflow/cluster/otel-collector-config.yaml).
+Do not hard-code service or instance names in Collector processors — derive
them from Airflow's
+resource attributes.
+
+### 3. SkyWalking OAP
+
+Ensure `airflow/*` is in `SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES`
(enabled by default).
+
+## Entity model
+
+| SkyWalking entity | Mapping |
+|-------------------|---------|
+| Service | `airflow::{cluster}` from resource `cluster` |
+| Instance (OAP) / **Components** (UI) | Scheduler or triggerer from
`host.name` |
+
+### Components vs SkyWalking Instance vs Airflow Task Instance
+
+OAP stores scheduler/triggerer hosts as **Instance**; Horizon UI labels the
tab **Components** so
+operators do not confuse it with an Airflow **Task Instance** (one task run
inside a DAG run).
+
+| Term | Meaning |
+|------|---------|
+| SkyWalking **Service** | One Airflow cluster (`airflow::{cluster}`) |
+| **Components** (UI) / **Instance** (OAP) | Long-running scheduler or
triggerer (`host.name`) |
+| Airflow **Task Instance** | Single task execution — **not** on this
dashboard |
+
+Service panels aggregate cluster-wide samples. Component panels are scoped per
`host.name`.
+
+On Kubernetes, run a Collector **sidecar** per monitored pod; Airflow pushes
to `localhost:4318` and
+the sidecar forwards to OAP. Set `cluster` via `OTEL_RESOURCE_ATTRIBUTES`;
`host.name` comes from
+the pod hostname.
+
+## Supported metrics
+
+MAL rules: `otel-rules/airflow/airflow-service.yaml` and
`airflow-instance.yaml`.
+Asset counters use `airflow.asset.*` (Airflow 3.x only). Data source: Airflow
native
+OpenTelemetry export.
+
+### Airflow Service Supported Metrics
+
+| Monitoring Panel | Unit | Metric Name | Description |
+|------------------|------|-------------|-------------|
+| Tasks Executable | count | meter_airflow_scheduler_tasks_executable | Tasks
ready for execution |
+| Running Tasks | count | meter_airflow_executor_running_tasks | Tasks
currently running on executor |
+| Queued Tasks | count | meter_airflow_executor_queued_tasks | Queued tasks on
executor |
+| Scheduled Slots | count | meter_airflow_pool_scheduled_slots | Scheduled but
not yet running slots in pool |
+| Executor Open Slots | count | meter_airflow_executor_open_slots | Open
executor slots |
+| DAG File Queue Size | count | meter_airflow_dag_file_queue_size | DAG files
pending scan |
+| DAG Import Errors | count | meter_airflow_dag_import_errors | DAG files that
failed to parse |
+| DAG Bag Size | count | meter_airflow_dagbag_size | DAGs found in the last
scheduler scan |
+| DAG Total Parse Time | seconds | meter_airflow_dag_total_parse_time | Time
to scan and import queued DAG files |
+| DAG File Refresh Errors | count/min | meter_airflow_dag_file_refresh_error |
DAG file load failures per minute |
+| Asset Updates | count/min | meter_airflow_asset_updates | Updated assets per
minute |
+
+### Airflow Instance Supported Metrics
+
+| Monitoring Panel | Unit | Metric Name | Description |
+|------------------|------|-------------|-------------|
+| Pool Open / Deferred / Running Slots | count |
meter_airflow_instance_pool_open_slots,
meter_airflow_instance_pool_deferred_slots,
meter_airflow_instance_pool_running_slots | Pool capacity on the scheduler |
+| Running Tasks / Scheduled Slots | count |
meter_airflow_instance_executor_running_tasks,
meter_airflow_instance_pool_scheduled_slots | Executor queue depth and pool
slots waiting to run |
+| Scheduler Heartbeat | count/min | meter_airflow_instance_scheduler_heartbeat
| Scheduler heartbeats per minute |
+| Executor Open / Queued Slots | count |
meter_airflow_instance_executor_open_slots,
meter_airflow_instance_executor_queued_tasks | Executor capacity and queue
depth on the scheduler |
+| Asset Updates | count/min | meter_airflow_instance_asset_updates | Asset
updates on this host |
+| Asset Triggered DagRuns | count/min |
meter_airflow_instance_asset_triggered_dagruns | DagRuns triggered by assets |
+| Triggerer Heartbeat | count/min | meter_airflow_instance_triggerer_heartbeat
| Triggerer heartbeats per minute |
+| Triggers Running / Capacity Left | count |
meter_airflow_instance_triggers_running,
meter_airflow_instance_triggerer_capacity_left | Live deferrable trigger load
on the triggerer |
+| Triggers Blocked / Failed / Succeeded | count/min |
meter_airflow_instance_triggers_blocked_main_thread,
meter_airflow_instance_triggers_failed,
meter_airflow_instance_triggers_succeeded | Deferred trigger outcomes on the
triggerer |
+
+Panel-to-metric mapping details: [SWIP-7](../../swip/SWIP-7.md).
+
+## Verification
+
+Run the e2e suites (same as CI):
+
+| Suite | Command | Checks |
+|-------|---------|--------|
+| Mock (full MAL contract) | `e2e run -c
test/e2e-v2/cases/airflow/mock/e2e.yaml` | 29 |
+| Cluster (native OTel smoke) | `e2e run -c
test/e2e-v2/cases/airflow/cluster/e2e.yaml` | 16 |
+
+CI runs both via
[`.github/workflows/skywalking.yaml`](../../../../.github/workflows/skywalking.yaml)
+(**Airflow** and **Airflow Cluster** matrix jobs) after `make docker.all`.
+
+Details: [e2e README](../../../../test/e2e-v2/cases/airflow/README.md).
+
+## Horizon UI
+
+Open **Workflow Scheduler → Airflow** after OAP ingests metrics.
+
+When Airflow is linked to `K8S_SERVICE` via [service
hierarchy](../../concepts-and-designs/service-hierarchy.md),
+start from the **3D Infrastructure Map** (middleware tier) and drill down into
**Kubernetes Services**:
+
+
+
+**Service** — cluster KPIs and DAG processing trends (11 panels).
+
+
+
+**Components** — per-host scheduler (**six** widgets) or triggerer (**three**
widgets); the
+template defines nine widgets and each host shows only metrics present in OAP
for that role.
+
+
+
+
+
+**Kubernetes Services** — service, instances, endpoints, and topology for the
linked `K8S_SERVICE`:
+
+
+
+
+
+
+
+
+
+## Customization
+
+Override MAL rules under `otel-rules/airflow/` or extend Horizon UI
dashboards. Restart OAP after
+rule changes.
diff --git
a/docs/en/setup/backend/images/airflow/horizon-airflow-component-scheduler.png
b/docs/en/setup/backend/images/airflow/horizon-airflow-component-scheduler.png
new file mode 100644
index 0000000000..c4ec3ccbb9
Binary files /dev/null and
b/docs/en/setup/backend/images/airflow/horizon-airflow-component-scheduler.png
differ
diff --git
a/docs/en/setup/backend/images/airflow/horizon-airflow-component-triggerer.png
b/docs/en/setup/backend/images/airflow/horizon-airflow-component-triggerer.png
new file mode 100644
index 0000000000..fd073ce380
Binary files /dev/null and
b/docs/en/setup/backend/images/airflow/horizon-airflow-component-triggerer.png
differ
diff --git a/docs/en/setup/backend/images/airflow/horizon-airflow-service.png
b/docs/en/setup/backend/images/airflow/horizon-airflow-service.png
new file mode 100644
index 0000000000..3cddee01bb
Binary files /dev/null and
b/docs/en/setup/backend/images/airflow/horizon-airflow-service.png differ
diff --git
a/docs/en/setup/backend/images/airflow/horizon-infra-3d-map-airflow-dev.png
b/docs/en/setup/backend/images/airflow/horizon-infra-3d-map-airflow-dev.png
new file mode 100644
index 0000000000..c5b831b780
Binary files /dev/null and
b/docs/en/setup/backend/images/airflow/horizon-infra-3d-map-airflow-dev.png
differ
diff --git
a/docs/en/setup/backend/images/airflow/horizon-k8s-service-endpoints.png
b/docs/en/setup/backend/images/airflow/horizon-k8s-service-endpoints.png
new file mode 100644
index 0000000000..8a5ef9a661
Binary files /dev/null and
b/docs/en/setup/backend/images/airflow/horizon-k8s-service-endpoints.png differ
diff --git
a/docs/en/setup/backend/images/airflow/horizon-k8s-service-instances.png
b/docs/en/setup/backend/images/airflow/horizon-k8s-service-instances.png
new file mode 100644
index 0000000000..ab6c5a809e
Binary files /dev/null and
b/docs/en/setup/backend/images/airflow/horizon-k8s-service-instances.png differ
diff --git
a/docs/en/setup/backend/images/airflow/horizon-k8s-service-service.png
b/docs/en/setup/backend/images/airflow/horizon-k8s-service-service.png
new file mode 100644
index 0000000000..7fcb864338
Binary files /dev/null and
b/docs/en/setup/backend/images/airflow/horizon-k8s-service-service.png differ
diff --git
a/docs/en/setup/backend/images/airflow/horizon-k8s-service-topology.png
b/docs/en/setup/backend/images/airflow/horizon-k8s-service-topology.png
new file mode 100644
index 0000000000..82a235d8f0
Binary files /dev/null and
b/docs/en/setup/backend/images/airflow/horizon-k8s-service-topology.png differ
diff --git a/docs/en/swip/SWIP-7.md b/docs/en/swip/SWIP-7.md
new file mode 100644
index 0000000000..605b5caf27
--- /dev/null
+++ b/docs/en/swip/SWIP-7.md
@@ -0,0 +1,97 @@
+# Support Apache Airflow Monitoring
+
+## Motivation
+
+Apache Airflow is an open-source workflow management platform primarily used
for scheduling and
+monitoring workflows. It can be used to handle complex data pipelines and has
been widely applied
+in the fields of data engineering and data science. Airflow allows users to
write workflows called
+DAGs (Directed Acyclic Graphs). Each DAG contains a series of tasks that can
be executed in a
+specific sequence and dependency relationship. Due to its support for
multitasking in complex
+scenarios, monitoring the health and operational status of Airflow is crucial.
Through these
+metrics, it is possible to help analyze task health status, formulate
optimization plans, and
+design risk prevention strategies.
+
+## Architecture Graph
+
+```mermaid
+graph LR;
+ AirflowOTEL("Airflow OTEL") --> OpenTelemetryCollector("OpenTelemetry
Collector") --> SkyWalkingOTELReceiver("SkyWalking OTEL Receiver") -->
SkyWalkingMALEngine("SkyWalking MAL Engine") --> HorizonUI("Horizon UI")
+```
+
+## Proposed Changes
+
+1. Airflow exports metrics via native OpenTelemetry (`otel_on` /
`OTEL_EXPORTER_OTLP_*`).
+2. OpenTelemetry Collector receives OTLP metrics from Airflow and forwards
them to SkyWalking
+ OTel Receiver via the OpenTelemetry exporter.
+3. The SkyWalking OAP Server parses expressions with
[MAL](../concepts-and-designs/mal.md) to
+ filter, calculate, aggregate, and store the results.
+4. Metrics are displayed via [Horizon
UI](https://github.com/apache/skywalking-horizon-ui) under the
+ **Workflow Scheduler** menu group and can be customized on dashboards.
+
+SkyWalking models an Airflow deployment as `Layer: AIRFLOW`:
+
+- **Service** — one logical cluster (`airflow::{cluster}`), keyed by resource
attribute `cluster`.
+- **Instance** — scheduler or triggerer host (`host.name` resource attribute).
+
+Horizon labels this entity **Components** rather than **Instance** so
operators are not led to
+confuse it with Airflow **Task Instance** (a single task execution within one
DAG run). See
+[Airflow monitoring
setup](../setup/backend/backend-airflow-monitoring.md#components-vs-skywalking-instance-vs-airflow-task-instance)
+for the full naming rationale.
+
+### Airflow Service Supported Metrics
+
+| Monitoring Panel | Unit | Metric Name | Description |
+|------------------|------|-------------|-------------|
+| Tasks Executable | count | meter_airflow_scheduler_tasks_executable | Tasks
ready for execution |
+| Running Tasks | count | meter_airflow_executor_running_tasks | Tasks
currently running on executor |
+| Queued Tasks | count | meter_airflow_executor_queued_tasks | Queued tasks on
executor |
+| Scheduled Slots | count | meter_airflow_pool_scheduled_slots | Scheduled but
not yet running slots in pool (aggregated across pools via `aggregate_labels`
in the UI KPI card) |
+| Executor Open Slots | count | meter_airflow_executor_open_slots | Open
executor slots |
+| DAG File Queue Size | count | meter_airflow_dag_file_queue_size | DAG files
pending scan |
+| DAG Import Errors | count | meter_airflow_dag_import_errors | DAG files that
failed to parse |
+| DAG Bag Size | count | meter_airflow_dagbag_size | DAGs found in the last
scheduler scan |
+| DAG Total Parse Time | seconds | meter_airflow_dag_total_parse_time | Time
to scan and import queued DAG files |
+| DAG File Refresh Errors | count/min | meter_airflow_dag_file_refresh_error |
DAG file load failures per minute |
+| Asset Updates | count/min | meter_airflow_asset_updates | Updated assets per
minute |
+
+### Airflow Instance Supported Metrics
+
+| Monitoring Panel | Unit | Metric Name | Description |
+|------------------|------|-------------|-------------|
+| Pool Open / Deferred / Running Slots | count |
meter_airflow_instance_pool_open_slots,
meter_airflow_instance_pool_deferred_slots,
meter_airflow_instance_pool_running_slots | Pool capacity on the scheduler |
+| Running Tasks / Scheduled Slots | count |
meter_airflow_instance_executor_running_tasks,
meter_airflow_instance_pool_scheduled_slots | Executor queue depth and pool
slots waiting to run |
+| Scheduler Heartbeat | count/min | meter_airflow_instance_scheduler_heartbeat
| Scheduler heartbeats per minute |
+| Executor Open / Queued Slots | count |
meter_airflow_instance_executor_open_slots,
meter_airflow_instance_executor_queued_tasks | Executor capacity and queue
depth on the scheduler |
+| Asset Updates | count/min | meter_airflow_instance_asset_updates | Asset
updates on this host |
+| Asset Triggered DagRuns | count/min |
meter_airflow_instance_asset_triggered_dagruns | DagRuns triggered by assets |
+| Triggerer Heartbeat | count/min | meter_airflow_instance_triggerer_heartbeat
| Triggerer heartbeats per minute |
+| Triggers Running / Capacity Left | count |
meter_airflow_instance_triggers_running,
meter_airflow_instance_triggerer_capacity_left | Live deferrable trigger load
on the triggerer |
+| Triggers Blocked / Failed / Succeeded | count/min |
meter_airflow_instance_triggers_blocked_main_thread,
meter_airflow_instance_triggers_failed,
meter_airflow_instance_triggers_succeeded | Deferred trigger outcomes on the
triggerer |
+
+Service-level panels aggregate cluster-wide samples. Instance-level panels are
scoped per
+`host.name` (shown as **Components** in the UI). Do not sum instance-scoped
samples into service dashboards when each component
+exports the same instrument independently.
+
+**Scheduler** and **triggerer** components use Airflow core OpenTelemetry
export only.
+
+**Airflow 3.x only.** Asset metrics use OTel instruments `airflow.asset.*`
(Airflow 2.x
+`airflow.dataset.*` is not supported).
+
+Bundled Horizon UI dashboards chart the metrics above one-to-one (**27** MAL
metrics: 11 Service +
+16 Instance). **Tasks Executable** is Service-only
(`meter_airflow_scheduler_tasks_executable`).
+The Service dashboard has **11** panels. The **Components** template defines
**nine** widgets;
+scheduler hosts show **six**, triggerer hosts show **three** (see
+[setup doc](../setup/backend/backend-airflow-monitoring.md#horizon-ui)).
+
+## Imported Dependencies libs and their licenses.
+
+No new dependency.
+
+## Compatibility
+
+No breaking changes.
+
+## General usage docs
+
+See [Airflow monitoring setup](../setup/backend/backend-airflow-monitoring.md)
and
+[e2e coverage matrix](../../../test/e2e-v2/cases/airflow/README.md).
diff --git a/docs/en/swip/readme.md b/docs/en/swip/readme.md
index 7f43cb51e5..eac834e4a1 100644
--- a/docs/en/swip/readme.md
+++ b/docs/en/swip/readme.md
@@ -83,6 +83,7 @@ Next SWIP Number: 16
- [SWIP-10 Support Envoy AI Gateway Observability](SWIP-10/SWIP.md)
- [SWIP-9 Support Flink Monitoring](SWIP-9.md)
- [SWIP-8 Support Kong Monitoring](SWIP-8.md)
+- [SWIP-7 Support Apache Airflow Monitoring](SWIP-7.md)
- [SWIP-6 Support ActiveMQ Monitoring](SWIP-6.md)
- [SWIP-5 Support ClickHouse Monitoring](SWIP-5.md)
- [SWIP-4 Support available layers of service in the topology](SWIP-4.md)
diff --git a/docs/menu.yml b/docs/menu.yml
index 169644a4d5..542640bf65 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -152,6 +152,10 @@ catalog:
path: "/en/setup/backend/backend-rocketmq-monitoring"
- name: "ActiveMQ"
path: "/en/setup/backend/backend-activemq-monitoring"
+ - name: "Workflow Scheduler"
+ catalog:
+ - name: "Airflow"
+ path: "/en/setup/backend/backend-airflow-monitoring"
- name: "Data Processing Engine"
catalog:
- name: "Flink"
diff --git
a/oap-server/analyzer/meter-analyzer-scripts-test/src/test/resources/scripts/mal/test-otel-rules/airflow/airflow-instance.data.yaml
b/oap-server/analyzer/meter-analyzer-scripts-test/src/test/resources/scripts/mal/test-otel-rules/airflow/airflow-instance.data.yaml
new file mode 100644
index 0000000000..c2e9ce246f
--- /dev/null
+++
b/oap-server/analyzer/meter-analyzer-scripts-test/src/test/resources/scripts/mal/test-otel-rules/airflow/airflow-instance.data.yaml
@@ -0,0 +1,298 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+script:
oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-instance.yaml
+input:
+ airflow_pool_scheduled_slots:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-scheduler
+ pool_name: default_pool
+ value: 1.0
+ airflow_pool_open_slots:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-scheduler
+ pool_name: default_pool
+ value: 128.0
+ airflow_pool_deferred_slots:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-scheduler
+ pool_name: default_pool
+ value: 0.0
+ airflow_pool_running_slots:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-scheduler
+ pool_name: default_pool
+ value: 0.0
+ airflow_scheduler_heartbeat:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-scheduler
+ value: 6.0
+ airflow_executor_open_slots:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-scheduler
+ value: 16.0
+ airflow_executor_queued_tasks:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-scheduler
+ value: 3.0
+ airflow_triggers_running:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-triggerer
+ value: 2.0
+ airflow_triggerer_capacity_left:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-triggerer
+ value: 998.0
+ airflow_triggerer_heartbeat:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-triggerer
+ value: 5.0
+ airflow_triggers_blocked_main_thread:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-triggerer
+ value: 0.0
+ airflow_triggers_failed:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-triggerer
+ value: 0.0
+ airflow_triggers_succeeded:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-triggerer
+ value: 3.0
+ airflow_executor_running_tasks:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-scheduler
+ value: 1.0
+ airflow_asset_updates:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-scheduler
+ value: 4.0
+ airflow_asset_triggered_dagruns:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ host_name: airflow-scheduler
+ value: 2.0
+expected:
+ meter_airflow_instance_pool_scheduled_slots:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-scheduler
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-scheduler
+ pool_name: default_pool
+ value: 1.0
+ meter_airflow_instance_pool_open_slots:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-scheduler
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-scheduler
+ pool_name: default_pool
+ value: 128.0
+ meter_airflow_instance_pool_deferred_slots:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-scheduler
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-scheduler
+ pool_name: default_pool
+ value: 0.0
+ meter_airflow_instance_pool_running_slots:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-scheduler
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-scheduler
+ pool_name: default_pool
+ value: 0.0
+ meter_airflow_instance_scheduler_heartbeat:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-scheduler
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-scheduler
+ value: 3.0
+ meter_airflow_instance_executor_open_slots:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-scheduler
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-scheduler
+ value: 16.0
+ meter_airflow_instance_executor_queued_tasks:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-scheduler
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-scheduler
+ value: 3.0
+ meter_airflow_instance_triggerer_heartbeat:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-triggerer
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-triggerer
+ value: 2.5
+ meter_airflow_instance_triggers_running:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-triggerer
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-triggerer
+ value: 2.0
+ meter_airflow_instance_triggerer_capacity_left:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-triggerer
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-triggerer
+ value: 998.0
+ meter_airflow_instance_triggers_blocked_main_thread:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-triggerer
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-triggerer
+ value: 0.0
+ meter_airflow_instance_triggers_failed:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-triggerer
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-triggerer
+ value: 0.0
+ meter_airflow_instance_triggers_succeeded:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-triggerer
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-triggerer
+ value: 1.5
+ meter_airflow_instance_executor_running_tasks:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-scheduler
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-scheduler
+ value: 1.0
+ meter_airflow_instance_asset_updates:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-scheduler
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-scheduler
+ value: 2.0
+ meter_airflow_instance_asset_triggered_dagruns:
+ entities:
+ - scope: SERVICE_INSTANCE
+ service: 'airflow::airflow-cluster'
+ instance: airflow-scheduler
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ host_name: airflow-scheduler
+ value: 1.0
diff --git
a/oap-server/analyzer/meter-analyzer-scripts-test/src/test/resources/scripts/mal/test-otel-rules/airflow/airflow-service.data.yaml
b/oap-server/analyzer/meter-analyzer-scripts-test/src/test/resources/scripts/mal/test-otel-rules/airflow/airflow-service.data.yaml
new file mode 100644
index 0000000000..a2025934e1
--- /dev/null
+++
b/oap-server/analyzer/meter-analyzer-scripts-test/src/test/resources/scripts/mal/test-otel-rules/airflow/airflow-service.data.yaml
@@ -0,0 +1,174 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+script:
oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-service.yaml
+input:
+ airflow_scheduler_tasks_executable:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ value: 5.0
+ airflow_executor_queued_tasks:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ value: 3.0
+ airflow_executor_running_tasks:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ value: 2.0
+ airflow_executor_open_slots:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ value: 16.0
+ airflow_pool_scheduled_slots:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ pool_name: default_pool
+ value: 1.0
+ airflow_dag_processing_file_path_queue_size:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ value: 4.0
+ airflow_dag_processing_import_errors:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ value: 0.0
+ airflow_dagbag_size:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ value: 42.0
+ airflow_dag_processing_total_parse_time:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ value: 3.5
+ airflow_dag_file_refresh_error:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ value: 1.0
+ airflow_asset_updates:
+ - labels:
+ cluster: airflow-cluster
+ job_name: Airflow
+ value: 7.0
+expected:
+ meter_airflow_scheduler_tasks_executable:
+ entities:
+ - scope: SERVICE
+ service: 'airflow::airflow-cluster'
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ value: 5.0
+ meter_airflow_executor_queued_tasks:
+ entities:
+ - scope: SERVICE
+ service: 'airflow::airflow-cluster'
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ value: 3.0
+ meter_airflow_executor_running_tasks:
+ entities:
+ - scope: SERVICE
+ service: 'airflow::airflow-cluster'
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ value: 2.0
+ meter_airflow_executor_open_slots:
+ entities:
+ - scope: SERVICE
+ service: 'airflow::airflow-cluster'
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ value: 16.0
+ meter_airflow_pool_scheduled_slots:
+ entities:
+ - scope: SERVICE
+ service: 'airflow::airflow-cluster'
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ pool_name: default_pool
+ value: 1.0
+ meter_airflow_dag_file_queue_size:
+ entities:
+ - scope: SERVICE
+ service: 'airflow::airflow-cluster'
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ value: 4.0
+ meter_airflow_dag_import_errors:
+ entities:
+ - scope: SERVICE
+ service: 'airflow::airflow-cluster'
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ value: 0.0
+ meter_airflow_dagbag_size:
+ entities:
+ - scope: SERVICE
+ service: 'airflow::airflow-cluster'
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ value: 42.0
+ meter_airflow_dag_total_parse_time:
+ entities:
+ - scope: SERVICE
+ service: 'airflow::airflow-cluster'
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ value: 3.5
+ meter_airflow_dag_file_refresh_error:
+ entities:
+ - scope: SERVICE
+ service: 'airflow::airflow-cluster'
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ value: 0.5
+ meter_airflow_asset_updates:
+ entities:
+ - scope: SERVICE
+ service: 'airflow::airflow-cluster'
+ layer: AIRFLOW
+ samples:
+ - labels:
+ cluster: 'airflow::airflow-cluster'
+ value: 3.5
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java
index bb702d5c1a..eba23be781 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java
@@ -39,7 +39,7 @@ import
org.apache.skywalking.oap.server.core.UnexpectedException;
* <p>Layers are persisted by ordinal (int). The ordinal space is partitioned
by tier so
* each registration channel has a non-overlapping range:
* <ul>
- * <li>{@code 0 – 9_999} — built-in {@code Layer.*} constants. Currently
{@code 0..49} in
+ * <li>{@code 0 – 9_999} — built-in {@code Layer.*} constants. Currently
{@code 0..50} in
* use; the rest of the range is reserved for future built-ins. Ordinals
already
* persisted in storage are frozen forever and must never be reused.</li>
* <li>{@code 10_000 – 99_999} — boot-time external layers: {@code
layer-extensions.yml},
@@ -298,6 +298,9 @@ public final class Layer {
/** Alipay Mini Program monitoring via mini-program-monitor SDK */
public static final Layer ALIPAY_MINI_PROGRAM =
register("ALIPAY_MINI_PROGRAM", 49, true);
+ /** Apache Airflow workflow orchestration (native OpenTelemetry metrics
via OTel Collector). */
+ public static final Layer AIRFLOW = register("AIRFLOW", 50, true);
+
private final String name;
private final int value;
/**
diff --git a/oap-server/server-starter/src/main/resources/application.yml
b/oap-server/server-starter/src/main/resources/application.yml
index 985d22bd1a..ec2d3d7bae 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -390,7 +390,7 @@ receiver-otel:
selector: ${SW_OTEL_RECEIVER:default}
default:
enabledHandlers:
${SW_OTEL_RECEIVER_ENABLED_HANDLERS:"otlp-traces,otlp-metrics,otlp-logs"}
- enabledOtelMetricsRules:
${SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES:"apisix,nginx/*,k8s/*,istio-controlplane,vm,mysql/*,postgresql/*,oap,aws-eks/*,windows,aws-s3/*,aws-dynamodb/*,aws-gateway/*,redis/*,elasticsearch/*,rabbitmq/*,mongodb/*,kafka/*,pulsar/*,bookkeeper/*,rocketmq/*,clickhouse/*,activemq/*,kong/*,flink/*,banyandb/*,envoy-ai-gateway/*,ios/*,miniprogram/*"}
+ enabledOtelMetricsRules:
${SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES:"apisix,nginx/*,k8s/*,istio-controlplane,vm,mysql/*,postgresql/*,oap,aws-eks/*,windows,aws-s3/*,aws-dynamodb/*,aws-gateway/*,redis/*,elasticsearch/*,rabbitmq/*,mongodb/*,kafka/*,pulsar/*,bookkeeper/*,rocketmq/*,clickhouse/*,activemq/*,kong/*,flink/*,airflow/*,banyandb/*,envoy-ai-gateway/*,ios/*,miniprogram/*"}
receiver-zipkin:
selector: ${SW_RECEIVER_ZIPKIN:-}
diff --git
a/oap-server/server-starter/src/main/resources/hierarchy-definition.yml
b/oap-server/server-starter/src/main/resources/hierarchy-definition.yml
index d8540809f6..fefff6914f 100644
--- a/oap-server/server-starter/src/main/resources/hierarchy-definition.yml
+++ b/oap-server/server-starter/src/main/resources/hierarchy-definition.yml
@@ -67,6 +67,9 @@ hierarchy:
KONG:
K8S_SERVICE: short-name
+ AIRFLOW:
+ K8S_SERVICE: short-name
+
VIRTUAL_DATABASE:
MYSQL: lower-short-name-with-fqdn
POSTGRESQL: lower-short-name-with-fqdn
@@ -118,6 +121,7 @@ layer-levels:
PULSAR: 2
ACTIVEMQ: 2
KONG: 2
+ AIRFLOW: 2
MESH_DP: 1
CILIUM_SERVICE: 1
diff --git a/oap-server/server-starter/src/main/resources/layer-extensions.yml
b/oap-server/server-starter/src/main/resources/layer-extensions.yml
index 9214cb94bc..fbe46595df 100644
--- a/oap-server/server-starter/src/main/resources/layer-extensions.yml
+++ b/oap-server/server-starter/src/main/resources/layer-extensions.yml
@@ -26,7 +26,7 @@
#
# Ordinal-range convention (enforced for the runtime tier, recommended for the
others):
#
-# 0 – 9_999 built-in Layer.* constants. Currently 0..49 in use;
the rest
+# 0 – 9_999 built-in Layer.* constants. Currently 0..50 in use;
the rest
# of the range is reserved for future built-ins added
by the
# OAP distribution.
# 10_000 – 99_999 boot-time external layers: this file
(layer-extensions.yml),
diff --git
a/oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-instance.yaml
b/oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-instance.yaml
new file mode 100644
index 0000000000..a3e374bfa7
--- /dev/null
+++
b/oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-instance.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Airflow component-level metrics from native OpenTelemetry export (SWIP-7).
+# Instance identity uses resource `host.name` (scheduler / triggerer hostname).
+filter: "{ tags -> tags.job_name == 'Airflow' }"
+expSuffix: tag({tags -> tags.cluster = 'airflow::' +
tags.cluster}).instance(['cluster'], ['host_name'], Layer.AIRFLOW)
+metricPrefix: meter_airflow_instance
+metricsRules:
+ - name: pool_scheduled_slots
+ exp: airflow_pool_scheduled_slots.sum(['cluster', 'host_name',
'pool_name'])
+ - name: pool_open_slots
+ exp: airflow_pool_open_slots.sum(['cluster', 'host_name', 'pool_name'])
+ - name: pool_deferred_slots
+ exp: airflow_pool_deferred_slots.sum(['cluster', 'host_name', 'pool_name'])
+ - name: pool_running_slots
+ exp: airflow_pool_running_slots.sum(['cluster', 'host_name', 'pool_name'])
+ - name: scheduler_heartbeat
+ exp: airflow_scheduler_heartbeat.sum(['cluster',
'host_name']).increase('PT1M')
+ - name: executor_open_slots
+ exp: airflow_executor_open_slots.sum(['cluster', 'host_name'])
+ - name: executor_queued_tasks
+ exp: airflow_executor_queued_tasks.sum(['cluster', 'host_name'])
+ - name: triggerer_heartbeat
+ exp: airflow_triggerer_heartbeat.sum(['cluster',
'host_name']).increase('PT1M')
+ - name: triggers_running
+ exp: airflow_triggers_running.sum(['cluster', 'host_name'])
+ - name: triggerer_capacity_left
+ exp: airflow_triggerer_capacity_left.sum(['cluster', 'host_name'])
+ - name: triggers_blocked_main_thread
+ exp: airflow_triggers_blocked_main_thread.sum(['cluster',
'host_name']).increase('PT1M')
+ - name: triggers_failed
+ exp: airflow_triggers_failed.sum(['cluster', 'host_name']).increase('PT1M')
+ - name: triggers_succeeded
+ exp: airflow_triggers_succeeded.sum(['cluster',
'host_name']).increase('PT1M')
+ - name: executor_running_tasks
+ exp: airflow_executor_running_tasks.sum(['cluster', 'host_name'])
+ - name: asset_updates
+ exp: airflow_asset_updates.sum(['cluster', 'host_name']).increase('PT1M')
+ - name: asset_triggered_dagruns
+ exp: airflow_asset_triggered_dagruns.sum(['cluster',
'host_name']).increase('PT1M')
diff --git
a/oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-service.yaml
b/oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-service.yaml
new file mode 100644
index 0000000000..7c3bd07ba9
--- /dev/null
+++
b/oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-service.yaml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Airflow cluster-level metrics from native OpenTelemetry export (SWIP-7).
+# Requires resource attribute `cluster` (set via OTEL_RESOURCE_ATTRIBUTES or
Collector transform).
+# OAP maps resource `service.name` (default "Airflow") to tag `job_name`.
+filter: "{ tags -> tags.job_name == 'Airflow' }"
+expSuffix: tag({tags -> tags.cluster = 'airflow::' +
tags.cluster}).service(['cluster'], Layer.AIRFLOW)
+metricPrefix: meter_airflow
+metricsRules:
+ - name: scheduler_tasks_executable
+ exp: airflow_scheduler_tasks_executable.sum(['cluster'])
+ - name: executor_queued_tasks
+ exp: airflow_executor_queued_tasks.sum(['cluster'])
+ - name: executor_running_tasks
+ exp: airflow_executor_running_tasks.sum(['cluster'])
+ - name: executor_open_slots
+ exp: airflow_executor_open_slots.sum(['cluster'])
+ - name: pool_scheduled_slots
+ exp: airflow_pool_scheduled_slots.sum(['cluster', 'pool_name'])
+ - name: dag_file_queue_size
+ exp: airflow_dag_processing_file_path_queue_size.sum(['cluster'])
+ - name: dag_import_errors
+ exp: airflow_dag_processing_import_errors.sum(['cluster'])
+ - name: dagbag_size
+ exp: airflow_dagbag_size.sum(['cluster'])
+ - name: dag_total_parse_time
+ exp: airflow_dag_processing_total_parse_time.sum(['cluster'])
+ - name: dag_file_refresh_error
+ exp: airflow_dag_file_refresh_error.sum(['cluster']).increase('PT1M')
+ - name: asset_updates
+ exp: airflow_asset_updates.sum(['cluster']).increase('PT1M')
diff --git a/test/e2e-v2/cases/airflow/.gitignore
b/test/e2e-v2/cases/airflow/.gitignore
new file mode 100644
index 0000000000..b3f9b3a5eb
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/.gitignore
@@ -0,0 +1,19 @@
+# Runtime artifacts from local e2e runs (cluster compose mounts / ad-hoc local
runs)
+cluster/logs/**
+cluster/**/__pycache__/
+cluster/docker-compose.wsl-override.yml
+infra-e2e-logs/**
+infra-e2e-wsl-*.log
+*-test.log
+*-e2e-report.txt
+*-e2e-*.log
+mock-e2e-*
+cluster-e2e-*
+logs/wsl/**
+preflight-wsl.log
+license-header-wsl.log
+run-tests-all.log
+
+# Local-only e2e overrides (use deploy/skywalking dist, not CI compose)
+**/e2e.local.yaml
+**/docker-compose.local-e2e.yml
diff --git a/test/e2e-v2/cases/airflow/README.md
b/test/e2e-v2/cases/airflow/README.md
new file mode 100644
index 0000000000..6446d46821
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/README.md
@@ -0,0 +1,74 @@
+# Airflow monitoring e2e tests (SWIP-7)
+
+End-to-end tests for [SWIP-7](../../../../docs/en/swip/SWIP-7.md).
+
+```
+airflow/
+├── mock/ # CI "Airflow" — OTLP JSON replay (29 checks)
+├── cluster/ # CI "Airflow Cluster" — real Celery + native OTel (16 checks)
+└── README.md
+```
+
+MAL rules:
[`otel-rules/airflow/`](../../../../oap-server/server-starter/src/main/resources/otel-rules/airflow/)
+(**27** metrics: 11 Service + 16 Instance).
+
+## Run locally (same as CI)
+
+From the repository root, with
[skywalking-infra-e2e](https://github.com/apache/skywalking-infra-e2e)
+and [swctl](https://github.com/apache/skywalking-cli) installed:
+
+```bash
+# Mock — full 27-metric SWIP-7 contract via OTLP JSON replay
+e2e run -c test/e2e-v2/cases/airflow/mock/e2e.yaml
+
+# Cluster — native scheduler/triggerer OTel on a real Celery stack
+e2e run -c test/e2e-v2/cases/airflow/cluster/e2e.yaml
+```
+
+CI builds OAP with `make docker.all` then runs the same configs via
+[`.github/workflows/skywalking.yaml`](../../../../.github/workflows/skywalking.yaml)
+(matrix entries **Airflow** and **Airflow Cluster**).
+
+Each suite runs **2 entity checks** (Service on `AIRFLOW` layer +
scheduler/triggerer instances)
+then **metric checks** via `metrics exec`.
+
+## Mock suite (`mock/`)
+
+Replays synthetic OTLP JSON — full **27**-metric SWIP-7 contract.
+
+| File | Role |
+|------|------|
+| [`e2e.yaml`](mock/e2e.yaml) | CI entry |
+| [`airflow-cases.yaml`](mock/airflow-cases.yaml) | 29 swctl checks |
+| [`docker-compose.yml`](mock/docker-compose.yml) | OAP + BanyanDB + OTLP
replay sidecar |
+|
[`mock-data/otel-airflow-metrics.json`](mock/mock-data/otel-airflow-metrics.json)
| Payload |
+
+## Cluster suite (`cluster/`)
+
+Real Airflow Celery cluster with native scheduler/triggerer OTel export —
**14** native metrics
+(flake-prone gauges omitted; full matrix stays in mock).
+
+| File | Role |
+|------|------|
+| [`e2e.yaml`](cluster/e2e.yaml) | CI entry (timeout 50m, `fail-fast: false`) |
+| [`airflow-cases.yaml`](cluster/airflow-cases.yaml) | 16 swctl checks |
+| [`docker-compose.yml`](cluster/docker-compose.yml) | Airflow + Collector +
OAP |
+| [`dags/`](cluster/dags/) | Workload DAGs |
+| [`setup.sh`](cluster/setup.sh) | Health wait +
[`seed-workload.sh`](cluster/seed-workload.sh) |
+
+`seed-workload.sh` drives asset, deferrable, and load DAGs, then holds for
OTel export.
+Optional env: `ASSET_ROUNDS`, `DEFERRABLE_TRIGGERS`, `LOAD_BURSTS`,
`HOLD_SECONDS`, `OTEL_FLUSH_SECONDS`.
+
+## Coverage split
+
+| | Mock | Cluster |
+|---|------|---------|
+| Data | JSON replay | Native OTel |
+| Metrics | 27 (11 + 16) | 14 (5 + 9) |
+| Omitted in cluster (service) | — | `dag_file_queue_size`, `dagbag_size`,
`dag_total_parse_time`, `dag_import_errors`, `dag_file_refresh_error`,
`asset_updates` |
+| Omitted in cluster (instance) | — | `pool_deferred/running`,
`executor_queued` (instance only; service `executor_queued` retained),
`asset_updates`, trigger blocked/failed/succeeded |
+
+## Related docs
+
+- [Airflow monitoring
setup](../../../../docs/en/setup/backend/backend-airflow-monitoring.md)
+- [SWIP-7 proposal](../../../../docs/en/swip/SWIP-7.md)
diff --git a/test/e2e-v2/cases/airflow/cluster/airflow-cases.yaml
b/test/e2e-v2/cases/airflow/cluster/airflow-cases.yaml
new file mode 100644
index 0000000000..4807b57101
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/airflow-cases.yaml
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Real Celery cluster integration smoke (service airflow-e2e-cluster).
+# Full SWIP-7 matrix: mock/airflow-cases.yaml (OTLP JSON replay).
+# 2 entity-registration + 5 service + 9 instance metrics = 16 checks.
+#
+# Omitted vs mock (native export flaky or empty in CI cluster):
+# service — dag_file_queue_size, dagbag_size, dag_total_parse_time,
+# dag_import_errors, dag_file_refresh_error, asset_updates
+# instance — pool_deferred_slots, pool_running_slots, executor_queued_tasks,
+# asset_updates, triggers_blocked_main_thread, triggers_failed,
+# triggers_succeeded
+# Details: ../README.md#coverage-split
+
+cases:
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql service ly AIRFLOW
+ expected: expected/service.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql instance ls
--service-name=airflow::airflow-e2e-cluster
+ expected: ../mock/expected/instance.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_scheduler_tasks_executable
--service-name=airflow::airflow-e2e-cluster
+ expected: ../mock/expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_executor_queued_tasks
--service-name=airflow::airflow-e2e-cluster
+ expected: ../mock/expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_executor_running_tasks
--service-name=airflow::airflow-e2e-cluster
+ expected: ../mock/expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_executor_open_slots
--service-name=airflow::airflow-e2e-cluster
+ expected: ../mock/expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_pool_scheduled_slots
--service-name=airflow::airflow-e2e-cluster
+ expected: ../mock/expected/metrics-has-value-label-poolname.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_pool_open_slots
--service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler
+ expected: ../mock/expected/metrics-has-value-label-poolname.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_pool_scheduled_slots
--service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler
+ expected: ../mock/expected/metrics-has-value-label-poolname.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_triggerer_heartbeat
--service-name=airflow::airflow-e2e-cluster --instance-name=airflow-triggerer
+ expected: ../mock/expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_executor_running_tasks
--service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler
+ expected: ../mock/expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_scheduler_heartbeat
--service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler
+ expected: ../mock/expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_executor_open_slots
--service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler
+ expected: ../mock/expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_triggers_running
--service-name=airflow::airflow-e2e-cluster --instance-name=airflow-triggerer
+ expected: ../mock/expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_triggerer_capacity_left
--service-name=airflow::airflow-e2e-cluster --instance-name=airflow-triggerer
+ expected: ../mock/expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_asset_triggered_dagruns
--service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler
+ expected: ../mock/expected/metrics-has-value.yml
diff --git a/test/e2e-v2/cases/airflow/cluster/compose-env.sh
b/test/e2e-v2/cases/airflow/cluster/compose-env.sh
new file mode 100644
index 0000000000..bcf8770838
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/compose-env.sh
@@ -0,0 +1,45 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Shared docker compose helpers for Airflow cluster e2e.
+# infra-e2e uses project name {workspace}_e2e (e.g. skywalking_e2e), not the
case folder name.
+
+COMPOSE_FILE="${COMPOSE_FILE:-test/e2e-v2/cases/airflow/cluster/docker-compose.yml}"
+COMPOSE_OVERRIDE="${COMPOSE_OVERRIDE:-}"
+
+resolve_compose_project() {
+ if [[ -n "${COMPOSE_PROJECT_NAME:-}" ]]; then
+ echo "${COMPOSE_PROJECT_NAME}"
+ return
+ fi
+ local scheduler_container
+ scheduler_container="$(docker ps --filter 'name=-airflow-scheduler-'
--format '{{.Names}}' | head -1)"
+ if [[ -n "${scheduler_container}" ]]; then
+ echo "${scheduler_container%-airflow-scheduler-*}"
+ return
+ fi
+ echo "skywalking_e2e"
+}
+
+COMPOSE_PROJECT_NAME="$(resolve_compose_project)"
+
+dc() {
+ if [[ -n "${COMPOSE_OVERRIDE}" ]]; then
+ docker compose -p "${COMPOSE_PROJECT_NAME}" -f "${COMPOSE_FILE}" -f
"${COMPOSE_OVERRIDE}" "$@"
+ else
+ docker compose -p "${COMPOSE_PROJECT_NAME}" -f "${COMPOSE_FILE}" "$@"
+ fi
+}
diff --git a/test/e2e-v2/cases/airflow/cluster/dags/cluster_load.py
b/test/e2e-v2/cases/airflow/cluster/dags/cluster_load.py
new file mode 100644
index 0000000000..cab5a88ad6
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/dags/cluster_load.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.standard.operators.bash import BashOperator
+
+# Sustained load for real-cluster e2e (queued / running / scheduled gauges).
+with DAG(
+ dag_id="cluster_load",
+ start_date=datetime(2024, 1, 1),
+ schedule=None,
+ catchup=False,
+ tags=["swip7", "e2e", "load"],
+ max_active_runs=4,
+) as dag:
+ for index in range(1, 9):
+ BashOperator(
+ task_id=f"sleep_{index}",
+ bash_command=f"echo load-{index}-start && sleep 75 && echo
load-{index}-done",
+ )
diff --git a/test/e2e-v2/cases/airflow/cluster/dags/cluster_smoke.py
b/test/e2e-v2/cases/airflow/cluster/dags/cluster_smoke.py
new file mode 100644
index 0000000000..80b19d4009
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/dags/cluster_smoke.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime, timedelta
+
+from airflow import DAG
+from airflow.providers.standard.operators.bash import BashOperator
+
+with DAG(
+ dag_id="cluster_smoke",
+ start_date=datetime(2024, 1, 1),
+ schedule=timedelta(minutes=1),
+ catchup=False,
+ tags=["swip7", "e2e"],
+) as dag:
+ BashOperator(
+ task_id="ping",
+ bash_command="echo cluster-smoke-$(hostname)",
+ )
diff --git a/test/e2e-v2/cases/airflow/cluster/dags/e2e_asset.py
b/test/e2e-v2/cases/airflow/cluster/dags/e2e_asset.py
new file mode 100644
index 0000000000..fd94e05960
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/dags/e2e_asset.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk import Asset
+
+E2E_ASSET = Asset("file:///tmp/swip7-e2e-asset")
+
+with DAG(
+ dag_id="e2e_asset_producer",
+ start_date=datetime(2024, 1, 1),
+ schedule=None,
+ catchup=False,
+ tags=["swip7", "e2e", "asset"],
+) as producer_dag:
+ BashOperator(
+ task_id="produce",
+ bash_command="echo swip7-asset-produce",
+ outlets=[E2E_ASSET],
+ )
+
+with DAG(
+ dag_id="e2e_asset_consumer",
+ start_date=datetime(2024, 1, 1),
+ schedule=[E2E_ASSET],
+ catchup=False,
+ tags=["swip7", "e2e", "asset"],
+) as consumer_dag:
+ BashOperator(
+ task_id="consume",
+ bash_command="echo swip7-asset-consume",
+ )
diff --git a/test/e2e-v2/cases/airflow/cluster/dags/e2e_deferrable.py
b/test/e2e-v2/cases/airflow/cluster/dags/e2e_deferrable.py
new file mode 100644
index 0000000000..f62fd2d9fc
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/dags/e2e_deferrable.py
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime, timedelta
+
+from airflow import DAG
+from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor
+
+# Deferrable sensor so triggerer exports native triggers_* OTel counters.
+with DAG(
+ dag_id="e2e_deferrable",
+ start_date=datetime(2024, 1, 1),
+ schedule=None,
+ catchup=False,
+ tags=["swip7", "e2e", "deferrable"],
+) as dag:
+ TimeDeltaSensor(
+ task_id="defer_wait",
+ delta=timedelta(seconds=45),
+ deferrable=True,
+ )
diff --git a/test/e2e-v2/cases/airflow/cluster/docker-compose.yml
b/test/e2e-v2/cases/airflow/cluster/docker-compose.yml
new file mode 100644
index 0000000000..4025be273b
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/docker-compose.yml
@@ -0,0 +1,290 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Real Airflow Celery cluster e2e (scheduler + triggerer native OTel; workers
run tasks only).
+# Airflow -> OTel Collector -> OAP (BanyanDB). Pair with cluster/e2e.yaml.
+
+x-airflow-otel-env: &airflow-otel-env
+ _PIP_ADDITIONAL_REQUIREMENTS: "apache-airflow[otel]
opentelemetry-exporter-otlp-proto-http"
+ AIRFLOW__METRICS__OTEL_ON: "True"
+ AIRFLOW__METRICS__OTEL_SERVICE: "Airflow"
+ AIRFLOW__METRICS__OTEL_HOST: otel-collector
+ AIRFLOW__METRICS__OTEL_PORT: "4318"
+ AIRFLOW__METRICS__OTEL_SSL_ACTIVE: "False"
+ AIRFLOW__METRICS__OTEL_INTERVAL_MILLISECONDS: "15000"
+ AIRFLOW__METRICS__METRICS_ALLOW_LIST:
"scheduler,executor,dagrun,pool,triggerer,triggers,celery,asset,dag_processing,dagbag,dag_file"
+ OTEL_SERVICE_NAME: "Airflow"
+ OTEL_EXPORTER_OTLP_PROTOCOL: http/protobuf
+ OTEL_METRICS_EXPORTER: otlp
+ OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: http://otel-collector:4318/v1/metrics
+ OTEL_METRIC_EXPORT_INTERVAL: "15000"
+ AIRFLOW_CLUSTER: "airflow-e2e-cluster"
+ # Cap running tasks so cluster_load keeps executor_queued / pool_scheduled
non-zero.
+ AIRFLOW__CORE__PARALLELISM: "8"
+ AIRFLOW__CELERY__WORKER_CONCURRENCY: "2"
+
+x-airflow-common: &airflow-common
+ image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:3.0.6-python3.11}
+ environment: &airflow-common-env
+ <<: *airflow-otel-env
+ AIRFLOW__CORE__EXECUTOR: CeleryExecutor
+ AIRFLOW__DATABASE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:airflow@postgres/airflow
+ AIRFLOW__CELERY__RESULT_BACKEND:
db+postgresql://airflow:airflow@postgres/airflow
+ AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
+ AIRFLOW__CORE__FERNET_KEY: ""
+ AIRFLOW__API_AUTH__JWT_SECRET: "swip7-e2e-airflow-jwt-secret"
+ AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "false"
+ AIRFLOW__CORE__LOAD_EXAMPLES: "true"
+ AIRFLOW__CORE__AUTH_MANAGER:
airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
+ AIRFLOW__CORE__EXECUTION_API_SERVER_URL:
http://airflow-api-server:8080/execution/
+ AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true"
+ volumes:
+ - ./dags:/opt/airflow/dags
+ - ./logs:/opt/airflow/logs
+ - ./config:/opt/airflow/config
+ user: "${AIRFLOW_UID:-50000}:0"
+ depends_on: &airflow-common-depends-on
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ networks:
+ - e2e
+
+services:
+ oap:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: oap
+ ports:
+ - 12800
+ environment:
+ SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES: "airflow/*"
+ networks:
+ - e2e
+ depends_on:
+ banyandb:
+ condition: service_healthy
+
+ banyandb:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: banyandb
+ ports:
+ - 17912
+ networks:
+ e2e:
+ aliases:
+ - banyandb
+
+ otel-collector:
+ image: otel/opentelemetry-collector:${OTEL_COLLECTOR_VERSION}
+ command: ["--config=/etc/otel-collector-config.yaml"]
+ volumes:
+ - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro
+ depends_on:
+ oap:
+ condition: service_healthy
+ networks:
+ - e2e
+
+ postgres:
+ image: postgres:13
+ environment:
+ POSTGRES_USER: airflow
+ POSTGRES_PASSWORD: airflow
+ POSTGRES_DB: airflow
+ healthcheck:
+ test: ["CMD", "pg_isready", "-U", "airflow"]
+ interval: 10s
+ retries: 5
+ start_period: 5s
+ networks:
+ - e2e
+
+ redis:
+ image: redis:7.2-bookworm
+ healthcheck:
+ test: ["CMD", "redis-cli", "ping"]
+ interval: 10s
+ timeout: 30s
+ retries: 50
+ start_period: 30s
+ networks:
+ - e2e
+
+ airflow-init:
+ <<: *airflow-common
+ entrypoint: /bin/bash
+ command:
+ - -c
+ - |
+ mkdir -p /opt/airflow/logs /opt/airflow/dags /opt/airflow/plugins
/opt/airflow/config
+ chown -R "${AIRFLOW_UID:-50000}:0" /opt/airflow/logs /opt/airflow/dags
/opt/airflow/plugins /opt/airflow/config || true
+ airflow db migrate
+ airflow users create --username "$${_AIRFLOW_WWW_USER_USERNAME}"
--password "$${_AIRFLOW_WWW_USER_PASSWORD}" --firstname Admin --lastname User
--role Admin --email [email protected] || true
+ exec airflow version
+ environment:
+ <<: *airflow-common-env
+ _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-admin}
+ _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-admin}
+ user: "0:0"
+ depends_on:
+ <<: *airflow-common-depends-on
+
+ airflow-api-server:
+ <<: *airflow-common
+ hostname: airflow-api-server
+ command: api-server
+ environment:
+ <<: *airflow-common-env
+ AIRFLOW__METRICS__OTEL_ON: "False"
+ OTEL_METRICS_EXPORTER: none
+ healthcheck:
+ test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/version"]
+ interval: 30s
+ timeout: 10s
+ retries: 5
+ start_period: 90s
+ restart: always
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
+
+ airflow-dag-processor:
+ <<: *airflow-common
+ hostname: airflow-dag-processor
+ command: dag-processor
+ environment:
+ <<: *airflow-common-env
+ AIRFLOW__METRICS__OTEL_ON: "False"
+ OTEL_METRICS_EXPORTER: none
+ healthcheck:
+ test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob
--hostname "$${HOSTNAME}"']
+ interval: 30s
+ timeout: 10s
+ retries: 5
+ start_period: 90s
+ restart: always
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
+ otel-collector:
+ condition: service_started
+
+ airflow-scheduler:
+ <<: *airflow-common
+ hostname: airflow-scheduler
+ command: scheduler
+ environment:
+ <<: *airflow-common-env
+ OTEL_RESOURCE_ATTRIBUTES:
"cluster=airflow-e2e-cluster,service.name=Airflow,host.name=airflow-scheduler"
+ healthcheck:
+ test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
+ interval: 30s
+ timeout: 10s
+ retries: 5
+ start_period: 90s
+ restart: always
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
+ airflow-api-server:
+ condition: service_healthy
+ otel-collector:
+ condition: service_started
+
+ airflow-triggerer:
+ <<: *airflow-common
+ hostname: airflow-triggerer
+ command: triggerer
+ environment:
+ <<: *airflow-common-env
+ OTEL_RESOURCE_ATTRIBUTES:
"cluster=airflow-e2e-cluster,service.name=Airflow,host.name=airflow-triggerer"
+ healthcheck:
+ test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob
--hostname "$${HOSTNAME}"']
+ interval: 30s
+ timeout: 10s
+ retries: 5
+ start_period: 90s
+ restart: always
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
+ airflow-api-server:
+ condition: service_healthy
+ otel-collector:
+ condition: service_started
+
+ airflow-worker-1:
+ <<: *airflow-common
+ hostname: airflow-worker-1
+ command: celery worker
+ environment:
+ <<: *airflow-common-env
+ AIRFLOW__METRICS__OTEL_ON: "False"
+ OTEL_METRICS_EXPORTER: none
+ DUMB_INIT_SETSID: "0"
+ healthcheck:
+ test:
+ - "CMD-SHELL"
+ - 'celery --app airflow.providers.celery.executors.celery_executor.app
inspect ping -d "celery@$${HOSTNAME}" || celery --app
airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
+ interval: 30s
+ timeout: 10s
+ retries: 5
+ start_period: 90s
+ restart: always
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
+ airflow-api-server:
+ condition: service_healthy
+ otel-collector:
+ condition: service_started
+
+ airflow-worker-2:
+ <<: *airflow-common
+ hostname: airflow-worker-2
+ command: celery worker
+ environment:
+ <<: *airflow-common-env
+ AIRFLOW__METRICS__OTEL_ON: "False"
+ OTEL_METRICS_EXPORTER: none
+ DUMB_INIT_SETSID: "0"
+ healthcheck:
+ test:
+ - "CMD-SHELL"
+ - 'celery --app airflow.providers.celery.executors.celery_executor.app
inspect ping -d "celery@$${HOSTNAME}" || celery --app
airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
+ interval: 30s
+ timeout: 10s
+ retries: 5
+ start_period: 90s
+ restart: always
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
+ airflow-api-server:
+ condition: service_healthy
+ otel-collector:
+ condition: service_started
+
+networks:
+ e2e:
diff --git a/test/e2e-v2/cases/airflow/cluster/e2e.yaml
b/test/e2e-v2/cases/airflow/cluster/e2e.yaml
new file mode 100644
index 0000000000..07390605fd
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/e2e.yaml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Real Airflow Celery cluster (scheduler + 2 workers + triggerer) with live
OTLP metrics.
+# Slower than mock/e2e.yaml; verifies production-like OTLP integration smoke.
+# Full SWIP-7 metric matrix is in mock/e2e.yaml (OTLP JSON replay).
+
+setup:
+ env: compose
+ file: docker-compose.yml
+ timeout: 50m
+ init-system-environment: ../../../script/env
+ steps:
+ - name: set PATH
+ command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
+ - name: install yq
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+ - name: install swctl
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+ - name: cluster setup (health + workload)
+ command: /usr/bin/bash test/e2e-v2/cases/airflow/cluster/setup.sh
+
+verify:
+ retry:
+ count: 60
+ interval: 3s
+ fail-fast: false
+ cases:
+ - includes:
+ - ./airflow-cases.yaml
diff --git a/test/e2e-v2/cases/airflow/cluster/expected/service.yml
b/test/e2e-v2/cases/airflow/cluster/expected/service.yml
new file mode 100644
index 0000000000..23bd03fc35
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/expected/service.yml
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{{- containsOnce . }}
+- id: {{ b64enc "airflow::airflow-e2e-cluster" }}.1
+ name: airflow::airflow-e2e-cluster
+ group: airflow
+ shortname: airflow-e2e-cluster
+ layers:
+ - AIRFLOW
+ normal: true
+{{- end }}
diff --git a/test/e2e-v2/cases/airflow/cluster/otel-collector-config.yaml
b/test/e2e-v2/cases/airflow/cluster/otel-collector-config.yaml
new file mode 100644
index 0000000000..1bb74a2eaf
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/otel-collector-config.yaml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+receivers:
+ otlp:
+ protocols:
+ http:
+ endpoint: 0.0.0.0:4318
+ grpc:
+ endpoint: 0.0.0.0:4317
+
+processors:
+ batch:
+
+exporters:
+ otlp:
+ endpoint: oap:11800
+ tls:
+ insecure: true
+
+service:
+ pipelines:
+ metrics:
+ receivers:
+ - otlp
+ processors:
+ - batch
+ exporters:
+ - otlp
diff --git a/test/e2e-v2/cases/airflow/cluster/seed-workload.sh
b/test/e2e-v2/cases/airflow/cluster/seed-workload.sh
new file mode 100644
index 0000000000..34d3bd857f
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/seed-workload.sh
@@ -0,0 +1,106 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Sustained native OTel traffic for cluster e2e (mirrors
deploy/airflow-local/seed-demo-full.ps1).
+# Phases: asset -> deferrable -> load bursts -> hold with periodic top-up +
DAG reserialize.
+
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+# shellcheck source=compose-env.sh
+source "${SCRIPT_DIR}/compose-env.sh"
+
+SCHEDULER="${AIRFLOW_SCHEDULER_SERVICE:-airflow-scheduler}"
+
+ASSET_ROUNDS="${ASSET_ROUNDS:-6}"
+DEFERRABLE_TRIGGERS="${DEFERRABLE_TRIGGERS:-8}"
+LOAD_BURSTS="${LOAD_BURSTS:-5}"
+HOLD_SECONDS="${HOLD_SECONDS:-${RUN_SECONDS:-180}}"
+TOPUP_INTERVAL="${TOPUP_INTERVAL:-30}"
+
+ASSET_DAGS=(e2e_asset_producer e2e_asset_consumer)
+DEFERRABLE_DAG="e2e_deferrable"
+LOAD_DAG="cluster_load"
+WARMUP_DAGS=(cluster_smoke example_bash_operator example_python_operator)
+
+trigger_dag() {
+ local dag="$1"
+ if ! dc exec -T "${SCHEDULER}" airflow dags trigger "${dag}" >/dev/null
2>&1; then
+ echo " warn: trigger ${dag} failed (missing or paused?)" >&2
+ fi
+}
+
+reserialize_dags() {
+ dc exec -T "${SCHEDULER}" airflow dags reserialize >/dev/null 2>&1 || true
+}
+
+echo "=== Airflow cluster e2e seed (project ${COMPOSE_PROJECT_NAME}) ==="
+echo " asset_rounds=${ASSET_ROUNDS} deferrable=${DEFERRABLE_TRIGGERS}
load_bursts=${LOAD_BURSTS} hold=${HOLD_SECONDS}s"
+
+echo "[1/4] Sync DAG metadata (dagbag / parse metrics)..."
+reserialize_dags
+sleep 8
+
+echo "[2/4] Asset DAGs (${ASSET_ROUNDS} rounds) -> asset_triggered_dagruns..."
+for round in $(seq 1 "${ASSET_ROUNDS}"); do
+ for dag in "${ASSET_DAGS[@]}"; do
+ trigger_dag "${dag}"
+ done
+ if [[ "${round}" -lt "${ASSET_ROUNDS}" ]]; then
+ sleep 8
+ fi
+done
+
+echo "[3/4] Deferrable sensors (${DEFERRABLE_TRIGGERS} triggers) -> triggerer
triggers_* / heartbeat..."
+for _ in $(seq 1 "${DEFERRABLE_TRIGGERS}"); do
+ trigger_dag "${DEFERRABLE_DAG}"
+ sleep 4
+done
+
+echo "[4/4] Load bursts (${LOAD_BURSTS} x ${LOAD_DAG}) -> executor
queued/running, pool scheduled..."
+for dag in "${WARMUP_DAGS[@]}"; do
+ trigger_dag "${dag}"
+done
+for burst in $(seq 1 "${LOAD_BURSTS}"); do
+ echo " burst ${burst}/${LOAD_BURSTS}"
+ trigger_dag "${LOAD_DAG}"
+ sleep 3
+done
+
+echo "Hold ${HOLD_SECONDS}s with top-up (sustain queue + deferrable + asset +
dagbag refresh)..."
+elapsed=0
+while [[ "${elapsed}" -lt "${HOLD_SECONDS}" ]]; do
+ step="${TOPUP_INTERVAL}"
+ if [[ $((HOLD_SECONDS - elapsed)) -lt "${step}" ]]; then
+ step=$((HOLD_SECONDS - elapsed))
+ fi
+ sleep "${step}"
+ elapsed=$((elapsed + step))
+ if [[ "${elapsed}" -lt "${HOLD_SECONDS}" ]]; then
+ trigger_dag "${LOAD_DAG}"
+ trigger_dag "${DEFERRABLE_DAG}"
+ trigger_dag "e2e_asset_producer"
+ reserialize_dags
+ echo " ... ${elapsed}s / ${HOLD_SECONDS}s (topped up load + deferrable +
asset + reserialize)"
+ fi
+done
+
+# Allow at least two OTel export cycles before verify (interval configured in
docker-compose).
+OTEL_FLUSH_SECONDS="${OTEL_FLUSH_SECONDS:-35}"
+echo "OTel flush wait ${OTEL_FLUSH_SECONDS}s..."
+sleep "${OTEL_FLUSH_SECONDS}"
+
+echo "Workload seed complete."
diff --git a/test/e2e-v2/cases/airflow/cluster/setup.sh
b/test/e2e-v2/cases/airflow/cluster/setup.sh
new file mode 100644
index 0000000000..7e9b263cc0
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/setup.sh
@@ -0,0 +1,26 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Cluster e2e setup: wait for scheduler health, then seed DAG workload.
+
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+
+/usr/bin/bash "${SCRIPT_DIR}/wait-scheduler-healthy.sh"
+/usr/bin/bash "${SCRIPT_DIR}/seed-workload.sh"
+
+echo "Airflow cluster setup finished."
diff --git a/test/e2e-v2/cases/airflow/cluster/wait-scheduler-healthy.sh
b/test/e2e-v2/cases/airflow/cluster/wait-scheduler-healthy.sh
new file mode 100644
index 0000000000..bafe39b873
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/cluster/wait-scheduler-healthy.sh
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+# shellcheck source=compose-env.sh
+source "${SCRIPT_DIR}/compose-env.sh"
+
+SCHEDULER="${AIRFLOW_SCHEDULER_SERVICE:-airflow-scheduler}"
+MAX_ATTEMPTS="${SCHEDULER_HEALTH_ATTEMPTS:-90}"
+SLEEP_SECONDS="${SCHEDULER_HEALTH_INTERVAL_SECONDS:-10}"
+
+echo "Waiting for ${SCHEDULER} (compose project ${COMPOSE_PROJECT_NAME})..."
+
+for _ in $(seq 1 "${MAX_ATTEMPTS}"); do
+ if dc exec -T "${SCHEDULER}" \
+ airflow jobs check --job-type SchedulerJob --hostname "${SCHEDULER}"; then
+ echo "Airflow scheduler healthy"
+ exit 0
+ fi
+ sleep "${SLEEP_SECONDS}"
+done
+
+echo "Airflow scheduler did not become healthy in time"
+exit 1
diff --git a/test/e2e-v2/cases/airflow/mock/Dockerfile.mock-sender
b/test/e2e-v2/cases/airflow/mock/Dockerfile.mock-sender
new file mode 100644
index 0000000000..62c44ca2f7
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/mock/Dockerfile.mock-sender
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ARG PYTHON_IMAGE=python:3.11-slim
+FROM ${PYTHON_IMAGE}
+
+WORKDIR /app
+
+RUN apt-get update \
+ && apt-get install -y --no-install-recommends netcat-openbsd curl \
+ && rm -rf /var/lib/apt/lists/*
+
+COPY requirements-replay.txt .
+RUN pip install --no-cache-dir -r requirements-replay.txt
+
+COPY otlp_replay_server.py .
+
+EXPOSE 9093
+
+CMD ["python", "otlp_replay_server.py"]
diff --git a/test/e2e-v2/cases/airflow/mock/airflow-cases.yaml
b/test/e2e-v2/cases/airflow/mock/airflow-cases.yaml
new file mode 100644
index 0000000000..3db93ab2db
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/mock/airflow-cases.yaml
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Full SWIP-7 metric matrix via mock OTLP replay (27 metrics; see
cluster/airflow-cases.yaml for native smoke).
+# 2 entity-registration checks + 11 service + 16 instance metrics = 29 checks.
+
+cases:
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql service ly AIRFLOW
+ expected: expected/service.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql instance ls
--service-name=airflow::airflow-cluster
+ expected: expected/instance.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_scheduler_tasks_executable
--service-name=airflow::airflow-cluster
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_executor_queued_tasks
--service-name=airflow::airflow-cluster
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_executor_running_tasks
--service-name=airflow::airflow-cluster
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_executor_open_slots
--service-name=airflow::airflow-cluster
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_pool_scheduled_slots
--service-name=airflow::airflow-cluster
+ expected: expected/metrics-has-value-label-poolname.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_dag_file_queue_size
--service-name=airflow::airflow-cluster
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_dag_import_errors
--service-name=airflow::airflow-cluster
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_dagbag_size --service-name=airflow::airflow-cluster
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_dag_total_parse_time
--service-name=airflow::airflow-cluster
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_dag_file_refresh_error
--service-name=airflow::airflow-cluster
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_asset_updates --service-name=airflow::airflow-cluster
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_pool_open_slots
--service-name=airflow::airflow-cluster --instance-name=airflow-scheduler
+ expected: expected/metrics-has-value-label-poolname.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_pool_deferred_slots
--service-name=airflow::airflow-cluster --instance-name=airflow-scheduler
+ expected: expected/metrics-has-value-label-poolname.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_pool_running_slots
--service-name=airflow::airflow-cluster --instance-name=airflow-scheduler
+ expected: expected/metrics-has-value-label-poolname.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_pool_scheduled_slots
--service-name=airflow::airflow-cluster --instance-name=airflow-scheduler
+ expected: expected/metrics-has-value-label-poolname.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_executor_running_tasks
--service-name=airflow::airflow-cluster --instance-name=airflow-scheduler
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_scheduler_heartbeat
--service-name=airflow::airflow-cluster --instance-name=airflow-scheduler
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_executor_open_slots
--service-name=airflow::airflow-cluster --instance-name=airflow-scheduler
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_executor_queued_tasks
--service-name=airflow::airflow-cluster --instance-name=airflow-scheduler
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_asset_updates
--service-name=airflow::airflow-cluster --instance-name=airflow-scheduler
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_asset_triggered_dagruns
--service-name=airflow::airflow-cluster --instance-name=airflow-scheduler
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_triggerer_heartbeat
--service-name=airflow::airflow-cluster --instance-name=airflow-triggerer
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_triggers_running
--service-name=airflow::airflow-cluster --instance-name=airflow-triggerer
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_triggerer_capacity_left
--service-name=airflow::airflow-cluster --instance-name=airflow-triggerer
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_triggers_blocked_main_thread
--service-name=airflow::airflow-cluster --instance-name=airflow-triggerer
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_triggers_failed
--service-name=airflow::airflow-cluster --instance-name=airflow-triggerer
+ expected: expected/metrics-has-value.yml
+ - query: swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec
--expression=meter_airflow_instance_triggers_succeeded
--service-name=airflow::airflow-cluster --instance-name=airflow-triggerer
+ expected: expected/metrics-has-value.yml
diff --git a/test/e2e-v2/cases/airflow/mock/docker-compose.yml
b/test/e2e-v2/cases/airflow/mock/docker-compose.yml
new file mode 100644
index 0000000000..22589350b5
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/mock/docker-compose.yml
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+services:
+ oap:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: oap
+ ports:
+ - 12800
+ environment:
+ SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES: "airflow/*"
+ networks:
+ - e2e
+
+ banyandb:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: banyandb
+ ports:
+ - 17912
+
+ sender:
+ build:
+ context: .
+ dockerfile: Dockerfile.mock-sender
+ args:
+ PYTHON_IMAGE: python:3.11-slim
+ volumes:
+ - ./mock-data:/data/otel-metrics:ro
+ environment:
+ OAP_HOST: oap
+ OAP_GRPC_PORT: 11800
+ OTEL_METRICS_DATA_PATH: /data/otel-metrics
+ networks:
+ - e2e
+ ports:
+ - 9093
+ healthcheck:
+ test: ["CMD", "sh", "-c", "nc -nz 127.0.0.1 9093"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ depends_on:
+ oap:
+ condition: service_healthy
+
+networks:
+ e2e:
diff --git a/test/e2e-v2/cases/airflow/mock/e2e.yaml
b/test/e2e-v2/cases/airflow/mock/e2e.yaml
new file mode 100644
index 0000000000..35b1ef731f
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/mock/e2e.yaml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+setup:
+ env: compose
+ file: docker-compose.yml
+ timeout: 20m
+ init-system-environment: ../../../script/env
+ steps:
+ - name: set PATH
+ command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
+ - name: install yq
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+ - name: install swctl
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+trigger:
+ action: http
+ interval: 3s
+ times: -1
+ url: http://${sender_host}:${sender_9093}/otel-metrics/send
+ method: GET
+
+verify:
+ retry:
+ count: 20
+ interval: 3s
+ cases:
+ - includes:
+ - ./airflow-cases.yaml
diff --git a/test/e2e-v2/cases/airflow/mock/expected/instance.yml
b/test/e2e-v2/cases/airflow/mock/expected/instance.yml
new file mode 100644
index 0000000000..e725b94c33
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/mock/expected/instance.yml
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{{- contains . }}
+- id: {{ notEmpty .id }}
+ name: airflow-scheduler
+ instanceuuid: {{ notEmpty .instanceuuid }}
+ attributes: []
+ language: UNKNOWN
+- id: {{ notEmpty .id }}
+ name: airflow-triggerer
+ instanceuuid: {{ notEmpty .instanceuuid }}
+ attributes: []
+ language: UNKNOWN
+{{- end }}
diff --git
a/test/e2e-v2/cases/airflow/mock/expected/metrics-has-value-label-poolname.yml
b/test/e2e-v2/cases/airflow/mock/expected/metrics-has-value-label-poolname.yml
new file mode 100644
index 0000000000..a590468712
--- /dev/null
+++
b/test/e2e-v2/cases/airflow/mock/expected/metrics-has-value-label-poolname.yml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+debuggingtrace: null
+type: TIME_SERIES_VALUES
+results:
+ {{- contains .results }}
+ - metric:
+ labels:
+ {{- contains .metric.labels }}
+ - key: pool_name
+ value: {{ notEmpty .value }}
+ {{- end}}
+ values:
+ {{- contains .values }}
+ - id: {{ notEmpty .id }}
+ value: {{ notEmpty .value }}
+ traceid: null
+ owner: null
+ {{- end}}
+ {{- end}}
+error: null
diff --git a/test/e2e-v2/cases/airflow/mock/expected/metrics-has-value.yml
b/test/e2e-v2/cases/airflow/mock/expected/metrics-has-value.yml
new file mode 100644
index 0000000000..ced9ce2891
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/mock/expected/metrics-has-value.yml
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+debuggingtrace: null
+type: TIME_SERIES_VALUES
+results:
+ {{- contains .results }}
+ - metric:
+ labels: []
+ values:
+ {{- contains .values }}
+ - id: {{ notEmpty .id }}
+ value: {{ notEmpty .value }}
+ traceid: null
+ owner: null
+ {{- end}}
+ {{- end}}
+error: null
diff --git a/test/e2e-v2/cases/airflow/mock/expected/service.yml
b/test/e2e-v2/cases/airflow/mock/expected/service.yml
new file mode 100644
index 0000000000..46998441ee
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/mock/expected/service.yml
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{{- containsOnce . }}
+- id: {{ b64enc "airflow::airflow-cluster" }}.1
+ name: airflow::airflow-cluster
+ group: airflow
+ shortname: airflow-cluster
+ layers:
+ - AIRFLOW
+ normal: true
+{{- end }}
diff --git a/test/e2e-v2/cases/airflow/mock/mock-data/otel-airflow-metrics.json
b/test/e2e-v2/cases/airflow/mock/mock-data/otel-airflow-metrics.json
new file mode 100644
index 0000000000..3549817933
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/mock/mock-data/otel-airflow-metrics.json
@@ -0,0 +1,364 @@
+{
+ "resourceMetrics": [
+ {
+ "resource": {
+ "attributes": [
+ {
+ "key": "service.name",
+ "value": {
+ "stringValue": "Airflow"
+ }
+ },
+ {
+ "key": "cluster",
+ "value": {
+ "stringValue": "airflow-cluster"
+ }
+ },
+ {
+ "key": "host.name",
+ "value": {
+ "stringValue": "airflow-scheduler"
+ }
+ }
+ ]
+ },
+ "scopeMetrics": [
+ {
+ "scope": {},
+ "metrics": [
+ {
+ "name": "airflow.scheduler_heartbeat",
+ "sum": {
+ "aggregationTemporality": 2,
+ "isMonotonic": true,
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1676140244999000000",
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 6.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.scheduler.tasks.executable",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 5.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.executor.running_tasks",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 2.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.pool.scheduled_slots",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 1.0,
+ "attributes": [
+ {
+ "key": "pool_name",
+ "value": {
+ "stringValue": "default_pool"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.executor.queued_tasks",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 3.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.executor.open_slots",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 16.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.pool.deferred_slots",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 0.0,
+ "attributes": [
+ {
+ "key": "pool_name",
+ "value": {
+ "stringValue": "default_pool"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.pool.open_slots",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 128.0,
+ "attributes": [
+ {
+ "key": "pool_name",
+ "value": {
+ "stringValue": "default_pool"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.pool.running_slots",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 0.0,
+ "attributes": [
+ {
+ "key": "pool_name",
+ "value": {
+ "stringValue": "default_pool"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.dag_processing.file_path_queue_size",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 4.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.dag_processing.import_errors",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 1.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.dagbag_size",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 42.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.dag_processing.total_parse_time",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 3.5
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.dag_file_refresh_error",
+ "sum": {
+ "aggregationTemporality": 2,
+ "isMonotonic": true,
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1676140244999000000",
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 1.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.asset.updates",
+ "sum": {
+ "aggregationTemporality": 2,
+ "isMonotonic": true,
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1676140244999000000",
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 7.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.asset.triggered_dagruns",
+ "sum": {
+ "aggregationTemporality": 2,
+ "isMonotonic": true,
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1676140244999000000",
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 2.0
+ }
+ ]
+ }
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "resource": {
+ "attributes": [
+ {
+ "key": "service.name",
+ "value": {
+ "stringValue": "Airflow"
+ }
+ },
+ {
+ "key": "cluster",
+ "value": {
+ "stringValue": "airflow-cluster"
+ }
+ },
+ {
+ "key": "host.name",
+ "value": {
+ "stringValue": "airflow-triggerer"
+ }
+ }
+ ]
+ },
+ "scopeMetrics": [
+ {
+ "scope": {},
+ "metrics": [
+ {
+ "name": "airflow.triggerer_heartbeat",
+ "sum": {
+ "aggregationTemporality": 2,
+ "isMonotonic": true,
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1676140244999000000",
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 5.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.triggers.running",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 2.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.triggerer.capacity_left",
+ "gauge": {
+ "dataPoints": [
+ {
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 998.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.triggers.blocked_main_thread",
+ "sum": {
+ "aggregationTemporality": 2,
+ "isMonotonic": true,
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1676140244999000000",
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 0.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.triggers.failed",
+ "sum": {
+ "aggregationTemporality": 2,
+ "isMonotonic": true,
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1676140244999000000",
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 0.0
+ }
+ ]
+ }
+ },
+ {
+ "name": "airflow.triggers.succeeded",
+ "sum": {
+ "aggregationTemporality": 2,
+ "isMonotonic": true,
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1676140244999000000",
+ "timeUnixNano": "1676140375004000000",
+ "asDouble": 3.0
+ }
+ ]
+ }
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/test/e2e-v2/cases/airflow/mock/otlp_replay_server.py
b/test/e2e-v2/cases/airflow/mock/otlp_replay_server.py
new file mode 100644
index 0000000000..e3d2266eb2
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/mock/otlp_replay_server.py
@@ -0,0 +1,106 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Airflow mock e2e OTLP JSON replay sidecar (Python)."""
+
+import glob
+import json
+import logging
+import os
+import threading
+import time
+
+import grpc
+from flask import Flask
+from google.protobuf import json_format
+from opentelemetry.proto.collector.metrics.v1 import metrics_service_pb2
+from opentelemetry.proto.collector.metrics.v1 import metrics_service_pb2_grpc
+
+logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s
%(message)s")
+LOG = logging.getLogger("otlp-replay")
+
+OAP_HOST = os.environ.get("OAP_HOST", "127.0.0.1")
+OAP_GRPC_PORT = int(os.environ.get("OAP_GRPC_PORT", "11800"))
+DATA_DIR = os.environ.get("OTEL_METRICS_DATA_PATH", "/data/otel-metrics")
+
+SEND_SEQ = 0
+SEQ_LOCK = threading.Lock()
+
+app = Flask(__name__)
+
+
+def rewrite_node(node, nano_time, start_nano_time, seq):
+ if isinstance(node, dict):
+ if "timeUnixNano" in node:
+ node["timeUnixNano"] = str(nano_time)
+ if "startTimeUnixNano" in node:
+ node["startTimeUnixNano"] = str(start_nano_time)
+ if "asDouble" in node and "startTimeUnixNano" in node:
+ node["asDouble"] = float(node["asDouble"]) + seq
+ for value in node.values():
+ rewrite_node(value, nano_time, start_nano_time, seq)
+ elif isinstance(node, list):
+ for item in node:
+ rewrite_node(item, nano_time, start_nano_time, seq)
+
+
+def send_metrics_once():
+ global SEND_SEQ
+ with SEQ_LOCK:
+ SEND_SEQ += 1
+ seq = SEND_SEQ
+
+ if not os.path.isdir(DATA_DIR):
+ msg = f"The path must be a folder: {DATA_DIR}"
+ LOG.error(msg)
+ return msg
+
+ json_files = sorted(glob.glob(os.path.join(DATA_DIR, "*.json")))
+ if not json_files:
+ msg = f"The folder doesn't contain any json file: {DATA_DIR}"
+ LOG.error(msg)
+ return msg
+
+ nano_time = int(time.time() * 1_000_000_000)
+ start_nano_time = nano_time - 60_000_000_000
+
+ channel = grpc.insecure_channel(f"{OAP_HOST}:{OAP_GRPC_PORT}")
+ stub = metrics_service_pb2_grpc.MetricsServiceStub(channel)
+
+ for path in json_files:
+ with open(path, encoding="utf-8") as handle:
+ payload = json.load(handle)
+ rewrite_node(payload, nano_time, start_nano_time, seq)
+ request = metrics_service_pb2.ExportMetricsServiceRequest()
+ json_format.Parse(json.dumps(payload), request,
ignore_unknown_fields=True)
+ try:
+ stub.Export(request, timeout=10)
+ except grpc.RpcError as error:
+ LOG.error("sendOtelMetrics by template error: %s", error)
+ channel.close()
+ return str(error)
+
+ channel.close()
+ return "ok"
+
+
[email protected]("/otel-metrics/send")
+def send_endpoint():
+ return send_metrics_once()
+
+
+if __name__ == "__main__":
+ app.run(host="0.0.0.0", port=9093, threaded=True)
diff --git a/test/e2e-v2/cases/airflow/mock/requirements-replay.txt
b/test/e2e-v2/cases/airflow/mock/requirements-replay.txt
new file mode 100644
index 0000000000..80c4250e3b
--- /dev/null
+++ b/test/e2e-v2/cases/airflow/mock/requirements-replay.txt
@@ -0,0 +1,4 @@
+flask==3.0.3
+grpcio==1.62.2
+protobuf==4.25.3
+opentelemetry-proto==1.24.0
diff --git a/test/e2e-v2/cases/storage/expected/config-dump.yml
b/test/e2e-v2/cases/storage/expected/config-dump.yml
index a95bda9e18..e2a37ee996 100644
--- a/test/e2e-v2/cases/storage/expected/config-dump.yml
+++ b/test/e2e-v2/cases/storage/expected/config-dump.yml
@@ -176,7 +176,7 @@
"receiver-log.provider": "default",
"receiver-meter.provider": "default",
"receiver-otel.default.enabledHandlers":
"otlp-traces,otlp-metrics,otlp-logs",
- "receiver-otel.default.enabledOtelMetricsRules":
"apisix,nginx/*,k8s/*,istio-controlplane,vm,mysql/*,postgresql/*,oap,aws-eks/*,windows,aws-s3/*,aws-dynamodb/*,aws-gateway/*,redis/*,elasticsearch/*,rabbitmq/*,mongodb/*,kafka/*,pulsar/*,bookkeeper/*,rocketmq/*,clickhouse/*,activemq/*,kong/*,flink/*,banyandb/*,envoy-ai-gateway/*,ios/*,miniprogram/*",
+ "receiver-otel.default.enabledOtelMetricsRules":
"apisix,nginx/*,k8s/*,istio-controlplane,vm,mysql/*,postgresql/*,oap,aws-eks/*,windows,aws-s3/*,aws-dynamodb/*,aws-gateway/*,redis/*,elasticsearch/*,rabbitmq/*,mongodb/*,kafka/*,pulsar/*,bookkeeper/*,rocketmq/*,clickhouse/*,activemq/*,kong/*,flink/*,airflow/*,banyandb/*,envoy-ai-gateway/*,ios/*,miniprogram/*",
"receiver-otel.provider": "default",
"receiver-pprof.default.memoryParserEnabled": "true",
"receiver-pprof.default.pprofMaxSize": "31457280",