arpitrathore opened a new issue, #67109:
URL: https://github.com/apache/airflow/issues/67109
### Description
The InfluxDB 3 support added in #58929 provides `InfluxDB3Hook` and
`InfluxDB3Operator` but no dedicated sensor. Today, users that need to gate
downstream tasks on the arrival of data in an InfluxDB 3 measurement have to
wrap `InfluxDB3Hook` in a `PythonSensor`, which works but lacks the
discoverability, deferrable semantics, and consistent API of a dedicated sensor.
This issue proposes adding a dedicated sensor (exact class name and API to
be decided in review) to the influxdb provider, following the pattern
established by:
- `SqlSensor` (`airflow.providers.common.sql.sensors.sql`). Generic
SQL-truthy polling.
- `AwaitMessageSensor` (`airflow.providers.apache.kafka.sensors.kafka`).
Deferrable Kafka message wait.
- `BigQueryTablePartitionExistenceSensor`
(`airflow.providers.google.cloud.sensors.bigquery`). Partition existence check.
- `HivePartitionSensor` and `NamedHivePartitionSensor`
(`airflow.providers.apache.hive.sensors.hive_partition`).
#### Proposed API
Two complementary designs. Final naming and signatures are open for
discussion in review.
**Option A (recommended first cut): generic SQL-truthy sensor, modelled on
`SqlSensor`.** Smallest delivery, broadest applicability.
```python
from airflow.providers.influxdb.sensors.influxdb3 import InfluxDB3Sensor
wait_for_data = InfluxDB3Sensor(
task_id="wait_for_data",
influxdb3_conn_id="influxdb3_ro",
sql="""
SELECT 1 FROM "events"
WHERE time >= '{{ data_interval_start }}'
AND time < '{{ data_interval_end }}'
LIMIT 1
""",
deferrable=True,
poke_interval=60,
timeout=3600,
)
```
**Option B (follow-up): ergonomic window-existence sensor, modelled on
`BigQueryTablePartitionExistenceSensor`.** Removes the boilerplate of
hand-writing the `SELECT ... LIMIT 1` query.
```python
from airflow.providers.influxdb.sensors.influxdb3 import
InfluxDB3MeasurementWindowSensor
wait_for_upstream = InfluxDB3MeasurementWindowSensor(
task_id="wait_for_upstream",
influxdb3_conn_id="influxdb3_ro",
measurement="events",
window_start="{{ data_interval_start }}",
window_end="{{ data_interval_end }}",
deferrable=True,
poke_interval=60,
timeout=3600,
)
```
Class names (`InfluxDB3Sensor`, `InfluxDB3MeasurementWindowSensor`,
`InfluxDB3PartitionExistenceSensor`, etc.) and parameter shapes are
placeholders open to maintainer preference.
#### Acceptance criteria
- [ ] At least the generic SQL-truthy sensor (Option A) landed in
`airflow.providers.influxdb.sensors.influxdb3`. The window-existence variant
(Option B) can ship in the same PR or as a follow-up.
- [ ] Deferrable variant from day one (`deferrable=True` parameter, trigger
class implementing the async polling loop). The trigger should use
`InfluxDBClient3.query_async()` (introduced in `influxdb3-python` 0.12.0) so
polling does not hold worker slots. The provider's current
`influxdb3-python>=0.7.0` lower bound will need to be bumped to `>=0.12.0`.
- [ ] Unit tests covering match-found, no-match-timeout, and the deferrable
path.
- [ ] Integration test against a local InfluxDB 3 instance (or equivalent).
- [ ] Provider documentation with usage examples.
#### Scope and tradeoffs
`InfluxDBClient3.query_async()` is a `loop.run_in_executor(None, ...)`
wrapper around the blocking Apache Arrow Flight calls (see
`influxdb3-python/influxdb_client_3/query/query_api.py`). The interface is a
proper coroutine and is suitable for use in a Trigger. Concurrency in the
Triggerer is bounded by the executor's thread pool rather than by native async
IO. Native async over Arrow Flight is a future improvement on the InfluxData
client side, not a blocker for this issue. The provider's current
`influxdb3-python>=0.7.0` lower bound will need to be bumped to `>=0.12.0`, the
release that introduced `query_async`.
### Use case/motivation
Time-series pipelines built on InfluxDB 3 frequently have inter-DAG data
dependencies. One DAG writes a daily aggregate to a measurement; a downstream
DAG reads from that measurement to compute a higher-level aggregate. Today the
only options are:
1. Fixed-delay scheduling. Schedule the downstream DAG to run N hours after
the upstream, hoping the upstream finishes in time. Brittle: if the upstream
runs long, the downstream reads stale or missing data; if it finishes early,
the downstream slot sits idle.
2. Hand-rolled `PythonSensor`. Every team writes a small wrapper around
`InfluxDB3Hook` to poll for data presence. Duplicated effort, inconsistent
semantics, not deferrable by default.
A dedicated sensor fixes both.
Example: a downstream DAG aggregates data written to an InfluxDB measurement
by one or more upstream DAGs. Today, the downstream DAG is scheduled with a
fixed delay (often hours) after the upstream to give the writes time to land.
If the upstream runs long, the downstream reads stale or missing data; if it
finishes early, the downstream slot sits idle. A dedicated sensor that polls
the upstream measurement would let the downstream DAG start as soon as the data
is actually present, rather than relying on a fixed offset.
### Related issues
- Original InfluxDB 3 support: #58929
- Companion proposal: deferrable variant of `InfluxDB3Operator`: #67107
- Generic SQL precedent: `airflow/providers/common/sql/sensors/sql.py`
- Kafka deferrable sensor precedent:
`airflow/providers/apache/kafka/sensors/kafka.py`
- BigQuery partition existence precedent:
`airflow/providers/google/cloud/sensors/bigquery.py`
### Are you willing to submit a PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]