andygrove opened a new pull request, #1388:
URL: https://github.com/apache/datafusion-ballista/pull/1388

   ## Summary
   
   Adds a `BallistaClientPool` that caches gRPC connections to executors by 
(host, port). This avoids the overhead of establishing new connections for each 
partition fetch during shuffle reads.
   
   ### Changes
   
   - Add `BallistaClientPool` struct in `client.rs` with:
     - `get_or_connect(host, port, max_message_size)` - gets cached connection 
or creates new one
     - `remove(host, port)` - removes a connection (useful for error recovery)
     - `clear()` - clears all cached connections
   - Add `global_client_pool()` function for shared pool access across all 
shuffle reads
   - Update `fetch_partition_remote()` in shuffle_reader.rs to use the 
connection pool
   
   ### Implementation Details
   
   The pool uses a read-write lock pattern with `parking_lot::RwLock`:
   - **Fast path**: read lock to check for existing connection (no contention 
for cache hits)
   - **Slow path**: create connection without holding lock, then write lock to 
cache
   
   Race handling: if multiple tasks try to connect to the same host 
simultaneously, each creates a connection but only one gets cached. The others 
detect this and return the cached one, dropping their extra connection. This is 
a rare case and avoids holding locks during async connection establishment.
   
   ### Motivation
   
   This addresses the TODO comment in shuffle_reader.rs:
   ```rust
   // TODO for shuffle client connections, we should avoid creating new 
connections again and again.
   // And we should also avoid to keep alive too many connections for long time.
   ```
   
   Connection establishment involves TCP handshake, TLS negotiation (if 
enabled), and HTTP/2 setup. Reusing connections eliminates this overhead for 
repeated fetches from the same executor.
   
   ## Note
   
   This PR was created with AI assistance using [Claude 
Code](https://claude.ai/code). All changes were reviewed and approved by a 
human maintainer.
   
   ## Test plan
   
   - [x] Existing shuffle_reader tests pass (`cargo test -p ballista-core 
shuffle_reader`)
   - [x] Existing client tests pass (`cargo test -p ballista-core client`)
   - [ ] Manual testing with distributed queries to verify connection reuse


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to