subramanya-dev opened a new pull request, #67793:
URL: https://github.com/apache/airflow/pull/67793

   Add coroutine-native `AsyncTask` / `AsyncClient` execution path alongside 
the existing synchronous one. Java users and existing tasks are completely 
unaffected.
   
   The sync `Client` wraps every coordinator call in `runBlocking`, tying up a 
thread per in-flight API call. `AsyncTask` + `AsyncClient` push the suspend 
boundary into the task body so tasks that use structured concurrency (e.g. 
parallel `async { client.getXCom(...) }` calls) cooperate with the coroutine 
scheduler instead of blocking threads.
   
   **New types**
   - `AsyncTask` — `suspend fun execute(context: Context, client: AsyncClient)`
   - `AsyncClient` (public SDK) — suspend mirror of `Client`
   - `execution.AsyncClient` (transport) — suspend mirror of `execution.Client`
   - `CoordinatorAsyncClient` — production impl; calls `comm.communicate()` 
directly without `runBlocking`
   
   **Dispatch**
   `Dag.tasks` is now `Map<String, TaskEntry>` where `TaskEntry` is a sealed 
class with `Sync` and `Async` variants. `TaskRunner` switches on the variant; 
async tasks are wrapped in a single top-level `runBlocking` so the 
coordinator-facing contract stays synchronous.
   
   **Annotation processor**
   `@Builder.Task` now works on `suspend fun` without any new annotation. 
Detection uses the standard technique: the Kotlin compiler appends a synthetic 
`kotlin.coroutines.Continuation` parameter to every suspend function's JVM 
descriptor. The processor erases and compares the last parameter's type, then 
strips it from the visible parameter list before generating XCom/client binding 
code.
   
   **What's unchanged**
   - `Task`, `Client`, `addTask()` — no modifications, no renames
   - Java source compatibility — `Task` is still the interface Java code 
implements
   - Existing DAGs and generated builder classes require no changes
   
   **Testing async tasks**
   Pass an `execution.AsyncClient` test double to the new `runTask(bundle, 
request, syncClient, asyncClient)` overload — no live `CoordinatorComm` needed.
   
   ---
   ##### Was generative AI tooling used to co-author this PR?
   - [ ] Yes (please specify the tool below)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to