arpitrathore opened a new issue, #67109:
URL: https://github.com/apache/airflow/issues/67109

   ### Description
   
   The InfluxDB 3 support added in #58929 provides `InfluxDB3Hook` and 
`InfluxDB3Operator` but no dedicated sensor. Today, users that need to gate 
downstream tasks on the arrival of data in an InfluxDB 3 measurement have to 
wrap `InfluxDB3Hook` in a `PythonSensor`, which works but lacks the 
discoverability, deferrable semantics, and consistent API of a dedicated sensor.
   
   This issue proposes adding a dedicated sensor (exact class name and API to 
be decided in review) to the influxdb provider, following the pattern 
established by:
   
   - `SqlSensor` (`airflow.providers.common.sql.sensors.sql`). Generic 
SQL-truthy polling.
   - `AwaitMessageSensor` (`airflow.providers.apache.kafka.sensors.kafka`). 
Deferrable Kafka message wait.
   - `BigQueryTablePartitionExistenceSensor` 
(`airflow.providers.google.cloud.sensors.bigquery`). Partition existence check.
   - `HivePartitionSensor` and `NamedHivePartitionSensor` 
(`airflow.providers.apache.hive.sensors.hive_partition`).
   
   #### Proposed API
   
   Two complementary designs. Final naming and signatures are open for 
discussion in review.
   
   **Option A (recommended first cut): generic SQL-truthy sensor, modelled on 
`SqlSensor`.** Smallest delivery, broadest applicability.
   
   ```python
   from airflow.providers.influxdb.sensors.influxdb3 import InfluxDB3Sensor
   
   wait_for_data = InfluxDB3Sensor(
       task_id="wait_for_data",
       influxdb3_conn_id="influxdb3_ro",
       sql="""
           SELECT 1 FROM "events"
           WHERE time >= '{{ data_interval_start }}'
             AND time <  '{{ data_interval_end }}'
           LIMIT 1
       """,
       deferrable=True,
       poke_interval=60,
       timeout=3600,
   )
   ```
   
   **Option B (follow-up): ergonomic window-existence sensor, modelled on 
`BigQueryTablePartitionExistenceSensor`.** Removes the boilerplate of 
hand-writing the `SELECT ... LIMIT 1` query.
   
   ```python
   from airflow.providers.influxdb.sensors.influxdb3 import 
InfluxDB3MeasurementWindowSensor
   
   wait_for_upstream = InfluxDB3MeasurementWindowSensor(
       task_id="wait_for_upstream",
       influxdb3_conn_id="influxdb3_ro",
       measurement="events",
       window_start="{{ data_interval_start }}",
       window_end="{{ data_interval_end }}",
       deferrable=True,
       poke_interval=60,
       timeout=3600,
   )
   ```
   
   Class names (`InfluxDB3Sensor`, `InfluxDB3MeasurementWindowSensor`, 
`InfluxDB3PartitionExistenceSensor`, etc.) and parameter shapes are 
placeholders open to maintainer preference.
   
   #### Acceptance criteria
   
   - [ ] At least the generic SQL-truthy sensor (Option A) landed in 
`airflow.providers.influxdb.sensors.influxdb3`. The window-existence variant 
(Option B) can ship in the same PR or as a follow-up.
   - [ ] Deferrable variant from day one (`deferrable=True` parameter, trigger 
class implementing the async polling loop). The trigger should use 
`InfluxDBClient3.query_async()` (introduced in `influxdb3-python` 0.12.0) so 
polling does not hold worker slots. The provider's current 
`influxdb3-python>=0.7.0` lower bound will need to be bumped to `>=0.12.0`.
   - [ ] Unit tests covering match-found, no-match-timeout, and the deferrable 
path.
   - [ ] Integration test against a local InfluxDB 3 instance (or equivalent).
   - [ ] Provider documentation with usage examples.
   
   #### Scope and tradeoffs
   
   `InfluxDBClient3.query_async()` is a `loop.run_in_executor(None, ...)` 
wrapper around the blocking Apache Arrow Flight calls (see 
`influxdb3-python/influxdb_client_3/query/query_api.py`). The interface is a 
proper coroutine and is suitable for use in a Trigger. Concurrency in the 
Triggerer is bounded by the executor's thread pool rather than by native async 
IO. Native async over Arrow Flight is a future improvement on the InfluxData 
client side, not a blocker for this issue. The provider's current 
`influxdb3-python>=0.7.0` lower bound will need to be bumped to `>=0.12.0`, the 
release that introduced `query_async`.
   
   ### Use case/motivation
   
   Time-series pipelines built on InfluxDB 3 frequently have inter-DAG data 
dependencies. One DAG writes a daily aggregate to a measurement; a downstream 
DAG reads from that measurement to compute a higher-level aggregate. Today the 
only options are:
   
   1. Fixed-delay scheduling. Schedule the downstream DAG to run N hours after 
the upstream, hoping the upstream finishes in time. Brittle: if the upstream 
runs long, the downstream reads stale or missing data; if it finishes early, 
the downstream slot sits idle.
   2. Hand-rolled `PythonSensor`. Every team writes a small wrapper around 
`InfluxDB3Hook` to poll for data presence. Duplicated effort, inconsistent 
semantics, not deferrable by default.
   
   A dedicated sensor fixes both.
   
   Example: a downstream DAG aggregates data written to an InfluxDB measurement 
by one or more upstream DAGs. Today, the downstream DAG is scheduled with a 
fixed delay (often hours) after the upstream to give the writes time to land. 
If the upstream runs long, the downstream reads stale or missing data; if it 
finishes early, the downstream slot sits idle. A dedicated sensor that polls 
the upstream measurement would let the downstream DAG start as soon as the data 
is actually present, rather than relying on a fixed offset.
   
   ### Related issues
   
   - Original InfluxDB 3 support: #58929
   - Companion proposal: deferrable variant of `InfluxDB3Operator`: #67107
   - Generic SQL precedent: `airflow/providers/common/sql/sensors/sql.py`
   - Kafka deferrable sensor precedent: 
`airflow/providers/apache/kafka/sensors/kafka.py`
   - BigQuery partition existence precedent: 
`airflow/providers/google/cloud/sensors/bigquery.py`
   
   ### Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to