moomindani opened a new pull request, #55502: URL: https://github.com/apache/spark/pull/55502
### What changes were proposed in this pull request? Add an opt-in config `spark.storage.blockManagerMaster.virtualThread.enabled` (default `false`). When enabled and running on Java 21+, `BlockManagerMasterEndpoint` constructs its ask thread pool via `Executors.newVirtualThreadPerTaskExecutor()` (reached by reflection so the source stays Java 17 compatible, matching SPARK-50383). Otherwise the existing 100-thread cached platform-thread pool is used unchanged. A warning is logged when the config is enabled on a JVM older than Java 21 (the flag is otherwise silently ignored), and an info log is emitted when virtual threads are in use. The pool construction is extracted into a package-private companion helper (`BlockManagerMasterEndpoint.createAskThreadPool` / `shouldUseVirtualThreads`) so the unit tests and benchmark exercise the exact production code path. The `askThreadPool` field's static type is narrowed from `ThreadPoolExecutor` to `ExecutorService` so that both backends fit; it is `private` and only used via `fromExecutorService(...)` and `shutdownNow()`, both supported on `ExecutorService`. ### Why are the changes needed? This is a sub-task under [SPARK-49807](https://issues.apache.org/jira/browse/SPARK-49807) (Incorporate Java Virtual Threads). It extends the opt-in pattern established by [SPARK-50383](https://issues.apache.org/jira/browse/SPARK-50383) (REST Submission API) and [SPARK-56297](https://issues.apache.org/jira/browse/SPARK-56297) (reconciliation thread pool) to the driver's block-manager control plane. `BlockManagerMasterEndpoint`'s ask pool serves outbound RPCs for `removeRdd` / `removeShuffle` / `removeBroadcast` / `removeBlock`, replication fan-out, and push-based shuffle merger bookkeeping. Each submitted task is purely blocked on the RPC round-trip. The pool is hard-capped at 100 platform threads, so on clusters with more than 100 executors a single `Future.sequence` fan-out (for example, `removeRdd` triggered by cache eviction) serialises into `ceil(N/100)` waves of blocking I/O even though the work is embarrassingly concurrent. The default is `false` rather than `true` (unlike SPARK-50383) because this pool lives on the core driver runtime path rather than a dedicated REST server. A conservative opt-in gives operators time to validate virtual-thread behaviour against their deployment (Java version, JFR pipelines, thread-dump tooling) before enabling it. #### Benchmark A microbenchmark (`BlockManagerMasterEndpointAskBenchmark`, results file committed) that fans out N tasks over mock endpoints sleeping 20 ms to simulate an RPC round-trip (OpenJDK 21.0.11, 3 iterations): | N | Platform (cap=100) | Virtual threads | Speedup | |---|---|---|---| | 50 | 26 ms | 20 ms | 1.3x | | 500 | 123 ms | 21 ms | 5.7x | | 5000 | 1168 ms | 31 ms | 38.1x | ### Does this PR introduce _any_ user-facing change? No. The new config defaults to `false`; public APIs are unchanged. Users on Java 21+ can opt in via `--conf spark.storage.blockManagerMaster.virtualThread.enabled=true`. ### How was this patch tested? - New unit suite `BlockManagerMasterEndpointVirtualThreadsSuite` covering: config on/off, Java 21+ path (`assume(isJavaVersionAtLeast21)`), Java < 21 fallback (`assume(!isJavaVersionAtLeast21)`), and concurrent submission + clean shutdown. Run on both Java 17.0.18 and Java 21.0.11 locally; each JDK reports 4 succeeded / 1 assume-skipped. - Existing `BlockManagerMasterSuite` continues to pass unchanged with the default-off config. - New benchmark `BlockManagerMasterEndpointAskBenchmark` producing `core/benchmarks/BlockManagerMasterEndpointAskBenchmark-jdk21-results.txt`. Results summarised above. - `dev/lint-scala`: Scalastyle and Scalafmt both pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Anthropic) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
