moomindani opened a new pull request, #55502:
URL: https://github.com/apache/spark/pull/55502

   ### What changes were proposed in this pull request?
   
   Add an opt-in config 
`spark.storage.blockManagerMaster.virtualThread.enabled` (default `false`). 
When enabled and running on Java 21+, `BlockManagerMasterEndpoint` constructs 
its ask thread pool via `Executors.newVirtualThreadPerTaskExecutor()` (reached 
by reflection so the source stays Java 17 compatible, matching SPARK-50383). 
Otherwise the existing 100-thread cached platform-thread pool is used unchanged.
   
   A warning is logged when the config is enabled on a JVM older than Java 21 
(the flag is otherwise silently ignored), and an info log is emitted when 
virtual threads are in use.
   
   The pool construction is extracted into a package-private companion helper 
(`BlockManagerMasterEndpoint.createAskThreadPool` / `shouldUseVirtualThreads`) 
so the unit tests and benchmark exercise the exact production code path.
   
   The `askThreadPool` field's static type is narrowed from 
`ThreadPoolExecutor` to `ExecutorService` so that both backends fit; it is 
`private` and only used via `fromExecutorService(...)` and `shutdownNow()`, 
both supported on `ExecutorService`.
   
   ### Why are the changes needed?
   
   This is a sub-task under 
[SPARK-49807](https://issues.apache.org/jira/browse/SPARK-49807) (Incorporate 
Java Virtual Threads). It extends the opt-in pattern established by 
[SPARK-50383](https://issues.apache.org/jira/browse/SPARK-50383) (REST 
Submission API) and 
[SPARK-56297](https://issues.apache.org/jira/browse/SPARK-56297) 
(reconciliation thread pool) to the driver's block-manager control plane.
   
   `BlockManagerMasterEndpoint`'s ask pool serves outbound RPCs for `removeRdd` 
/ `removeShuffle` / `removeBroadcast` / `removeBlock`, replication fan-out, and 
push-based shuffle merger bookkeeping. Each submitted task is purely blocked on 
the RPC round-trip. The pool is hard-capped at 100 platform threads, so on 
clusters with more than 100 executors a single `Future.sequence` fan-out (for 
example, `removeRdd` triggered by cache eviction) serialises into `ceil(N/100)` 
waves of blocking I/O even though the work is embarrassingly concurrent.
   
   The default is `false` rather than `true` (unlike SPARK-50383) because this 
pool lives on the core driver runtime path rather than a dedicated REST server. 
A conservative opt-in gives operators time to validate virtual-thread behaviour 
against their deployment (Java version, JFR pipelines, thread-dump tooling) 
before enabling it.
   
   #### Benchmark
   
   A microbenchmark (`BlockManagerMasterEndpointAskBenchmark`, results file 
committed) that fans out N tasks over mock endpoints sleeping 20 ms to simulate 
an RPC round-trip (OpenJDK 21.0.11, 3 iterations):
   
   | N | Platform (cap=100) | Virtual threads | Speedup |
   |---|---|---|---|
   | 50 | 26 ms | 20 ms | 1.3x |
   | 500 | 123 ms | 21 ms | 5.7x |
   | 5000 | 1168 ms | 31 ms | 38.1x |
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. The new config defaults to `false`; public APIs are unchanged. Users on 
Java 21+ can opt in via `--conf 
spark.storage.blockManagerMaster.virtualThread.enabled=true`.
   
   ### How was this patch tested?
   
   - New unit suite `BlockManagerMasterEndpointVirtualThreadsSuite` covering: 
config on/off, Java 21+ path (`assume(isJavaVersionAtLeast21)`), Java < 21 
fallback (`assume(!isJavaVersionAtLeast21)`), and concurrent submission + clean 
shutdown. Run on both Java 17.0.18 and Java 21.0.11 locally; each JDK reports 4 
succeeded / 1 assume-skipped.
   - Existing `BlockManagerMasterSuite` continues to pass unchanged with the 
default-off config.
   - New benchmark `BlockManagerMasterEndpointAskBenchmark` producing 
`core/benchmarks/BlockManagerMasterEndpointAskBenchmark-jdk21-results.txt`. 
Results summarised above.
   - `dev/lint-scala`: Scalastyle and Scalafmt both pass.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (Anthropic)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to