arpitrathore opened a new issue, #67107:
URL: https://github.com/apache/airflow/issues/67107
### Description
The `InfluxDB3Operator` added in #58929 currently executes queries
synchronously, which means each task holds an Airflow worker slot for the
entire duration of the query. For long-running analytics or aggregation queries
over InfluxDB 3 clusters, this prevents the worker from picking up other tasks
while the query runs on the database.
This issue proposes adding a `deferrable: bool = False` parameter (and a
separate trigger class) to `InfluxDB3Operator`, following the pattern already
established in:
- `SnowflakeSqlApiOperator` (deferrable variant with
`SnowflakeSqlApiTrigger`)
- `BigQueryInsertJobOperator` (deferrable mode with
`BigQueryInsertJobTrigger`)
- `RedshiftDataOperator` (deferrable with `RedshiftDataTrigger`)
#### Proposed API
```python
from airflow.providers.influxdb.operators.influxdb3 import InfluxDB3Operator
InfluxDB3Operator(
task_id="aggregate_metrics",
sql="SELECT ... FROM measurement WHERE ...",
influxdb3_conn_id="influxdb3_default",
deferrable=True,
poll_interval=30,
)
```
When `deferrable=True`, `execute()` should:
1. Submit the query to InfluxDB 3 using `InfluxDBClient3.query_async()`, the
asyncio coroutine exposed by `influxdb3-python`. (`query_async` was introduced
in `influxdb3-python` 0.12.0; the influxdb provider currently declares
`influxdb3-python>=0.7.0`, so the deferrable implementation will need to bump
the lower bound to `>=0.12.0`.) For very large result sets, the synchronous
`query(mode="reader")` Apache Arrow Flight stream is also available if a
streaming result interface is preferred.
2. Call `self.defer(trigger=InfluxDB3QueryTrigger(...),
method_name="execute_complete")`.
3. The trigger polls the query state on the InfluxDB cluster at
`poll_interval` and yields a `TriggerEvent` when the query finishes or fails.
4. `execute_complete()` retrieves the result and pushes it to XCom in the
same shape as the sync path.
#### Acceptance criteria
- [ ] New `deferrable: bool` parameter on `InfluxDB3Operator` (default
`False` for backward compatibility).
- [ ] New `InfluxDB3QueryTrigger` (or similar) implementing the async
polling loop using `influxdb3-python`'s async client.
- [ ] Unit tests covering both `deferrable=True` and `deferrable=False` code
paths.
- [ ] Integration test exercising the trigger end-to-end against a local
InfluxDB 3 instance (or mocked equivalent if a real one is impractical in CI).
- [ ] Provider documentation updated with a `deferrable=True` example.
#### Scope and tradeoffs
1. `InfluxDBClient3.query_async()` is a `loop.run_in_executor(None, ...)`
wrapper around the blocking Apache Arrow Flight calls (see
`influxdb3-python/influxdb_client_3/query/query_api.py`). The interface is a
proper coroutine and is suitable for use in a Trigger. Concurrency in the
Triggerer is bounded by the executor's thread pool rather than by native async
IO. Native async over Arrow Flight is a future improvement on the InfluxData
client side, not a blocker for this issue. The provider's current
`influxdb3-python>=0.7.0` lower bound will need to be bumped to `>=0.12.0`, the
release that introduced `query_async`.
2. The deferrable variant frees a worker slot while the query runs, but the
result still flows back through the operator and into XCom. This is most useful
for long-running queries with small-to-moderate result sets (aggregations,
freshness probes, dashboard queries). For very large extracts, the existing
`InfluxDB3Hook` + Python task pattern remains the right answer, and this issue
does not aim to replace it.
### Use case/motivation
Analytics queries on InfluxDB Cloud Dedicated or InfluxDB 3 Enterprise
routinely take minutes for windowed aggregations across high-cardinality
measurements. In a constrained worker pool, holding a slot per long-running
query limits the parallelism the rest of the DAG fleet can achieve. A
deferrable variant releases the worker slot back to the pool the moment the
query is submitted to InfluxDB. The Triggerer then polls the query state and
resumes the task when the result is ready.
This matches established Airflow patterns and unblocks teams that want to
standardise on the new `InfluxDB3Operator` without losing worker-slot
efficiency.
### Related issues
- Original InfluxDB 3 support: #58929
- Snowflake deferrable reference:
`airflow/providers/snowflake/operators/snowflake.py`,
`airflow/providers/snowflake/triggers/snowflake_trigger.py`
- BigQuery deferrable reference:
`airflow/providers/google/cloud/operators/bigquery.py`,
`airflow/providers/google/cloud/triggers/bigquery.py`
### Are you willing to submit a PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]