arpitrathore opened a new issue, #67107:
URL: https://github.com/apache/airflow/issues/67107

   ### Description
   
   The `InfluxDB3Operator` added in #58929 currently executes queries 
synchronously, which means each task holds an Airflow worker slot for the 
entire duration of the query. For long-running analytics or aggregation queries 
over InfluxDB 3 clusters, this prevents the worker from picking up other tasks 
while the query runs on the database.
   
   This issue proposes adding a `deferrable: bool = False` parameter (and a 
separate trigger class) to `InfluxDB3Operator`, following the pattern already 
established in:
   
   - `SnowflakeSqlApiOperator` (deferrable variant with 
`SnowflakeSqlApiTrigger`)
   - `BigQueryInsertJobOperator` (deferrable mode with 
`BigQueryInsertJobTrigger`)
   - `RedshiftDataOperator` (deferrable with `RedshiftDataTrigger`)
   
   #### Proposed API
   
   ```python
   from airflow.providers.influxdb.operators.influxdb3 import InfluxDB3Operator
   
   InfluxDB3Operator(
       task_id="aggregate_metrics",
       sql="SELECT ... FROM measurement WHERE ...",
       influxdb3_conn_id="influxdb3_default",
       deferrable=True,
       poll_interval=30,
   )
   ```
   
   When `deferrable=True`, `execute()` should:
   
   1. Submit the query to InfluxDB 3 using `InfluxDBClient3.query_async()`, the 
asyncio coroutine exposed by `influxdb3-python`. (`query_async` was introduced 
in `influxdb3-python` 0.12.0; the influxdb provider currently declares 
`influxdb3-python>=0.7.0`, so the deferrable implementation will need to bump 
the lower bound to `>=0.12.0`.) For very large result sets, the synchronous 
`query(mode="reader")` Apache Arrow Flight stream is also available if a 
streaming result interface is preferred.
   2. Call `self.defer(trigger=InfluxDB3QueryTrigger(...), 
method_name="execute_complete")`.
   3. The trigger polls the query state on the InfluxDB cluster at 
`poll_interval` and yields a `TriggerEvent` when the query finishes or fails.
   4. `execute_complete()` retrieves the result and pushes it to XCom in the 
same shape as the sync path.
   
   #### Acceptance criteria
   
   - [ ] New `deferrable: bool` parameter on `InfluxDB3Operator` (default 
`False` for backward compatibility).
   - [ ] New `InfluxDB3QueryTrigger` (or similar) implementing the async 
polling loop using `influxdb3-python`'s async client.
   - [ ] Unit tests covering both `deferrable=True` and `deferrable=False` code 
paths.
   - [ ] Integration test exercising the trigger end-to-end against a local 
InfluxDB 3 instance (or mocked equivalent if a real one is impractical in CI).
   - [ ] Provider documentation updated with a `deferrable=True` example.
   
   #### Scope and tradeoffs
   
   1. `InfluxDBClient3.query_async()` is a `loop.run_in_executor(None, ...)` 
wrapper around the blocking Apache Arrow Flight calls (see 
`influxdb3-python/influxdb_client_3/query/query_api.py`). The interface is a 
proper coroutine and is suitable for use in a Trigger. Concurrency in the 
Triggerer is bounded by the executor's thread pool rather than by native async 
IO. Native async over Arrow Flight is a future improvement on the InfluxData 
client side, not a blocker for this issue. The provider's current 
`influxdb3-python>=0.7.0` lower bound will need to be bumped to `>=0.12.0`, the 
release that introduced `query_async`.
   2. The deferrable variant frees a worker slot while the query runs, but the 
result still flows back through the operator and into XCom. This is most useful 
for long-running queries with small-to-moderate result sets (aggregations, 
freshness probes, dashboard queries). For very large extracts, the existing 
`InfluxDB3Hook` + Python task pattern remains the right answer, and this issue 
does not aim to replace it.
   
   ### Use case/motivation
   
   Analytics queries on InfluxDB Cloud Dedicated or InfluxDB 3 Enterprise 
routinely take minutes for windowed aggregations across high-cardinality 
measurements. In a constrained worker pool, holding a slot per long-running 
query limits the parallelism the rest of the DAG fleet can achieve. A 
deferrable variant releases the worker slot back to the pool the moment the 
query is submitted to InfluxDB. The Triggerer then polls the query state and 
resumes the task when the result is ready.
   
   This matches established Airflow patterns and unblocks teams that want to 
standardise on the new `InfluxDB3Operator` without losing worker-slot 
efficiency.
   
   ### Related issues
   
   - Original InfluxDB 3 support: #58929
   - Snowflake deferrable reference: 
`airflow/providers/snowflake/operators/snowflake.py`, 
`airflow/providers/snowflake/triggers/snowflake_trigger.py`
   - BigQuery deferrable reference: 
`airflow/providers/google/cloud/operators/bigquery.py`, 
`airflow/providers/google/cloud/triggers/bigquery.py`
   
   ### Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to