This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch virtual-threads in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit a778b3b41f1a18e79fced49865b2b0fab0dc4cbf Author: Wu Sheng <[email protected]> AuthorDate: Mon Feb 16 15:08:02 2026 +0800 Add virtual thread support for gRPC servers on JDK 25+ Introduce VirtualThreads utility that enables virtual-thread-per-task executors for all gRPC servers when running on JDK 25+, with automatic fallback to platform thread pools on older JDKs. JDK 25 is required as the first LTS with the synchronized pinning fix (JEP 491). Each gRPC server gets a distinct thread pool name (core-grpc, receiver-grpc, als-grpc, ebpf-grpc) for log identification, with virtual threads prefixed by "vt:" in their names. A kill switch (SW_VIRTUAL_THREADS_ENABLED=false) allows disabling virtual threads at runtime. Also documents SW_OAL_ENGINE_DEBUG alongside the new flag in configuration-vocabulary.md. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- docs/en/setup/backend/configuration-vocabulary.md | 10 ++ .../oap/server/core/CoreModuleProvider.java | 1 + .../oap/server/library/server/grpc/GRPCServer.java | 26 +++- .../oap/server/library/util/VirtualThreads.java | 171 +++++++++++++++++++++ .../server/library/util/VirtualThreadsTest.java | 130 ++++++++++++++++ .../envoy/EnvoyMetricReceiverProvider.java | 1 + .../ebpf/provider/EBPFReceiverProvider.java | 1 + .../server/SharingServerModuleProvider.java | 1 + 8 files changed, 336 insertions(+), 5 deletions(-) diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index 93fd86460f..cac56c080c 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -542,6 +542,16 @@ OAP will query the data from the "hot and warm" stage by default if the "warm" s | property | - | - | The group settings of property, such as UI and profiling. | - | - | | - | shardNum | - | Shards Number for property group. | SW_STORAGE_BANYANDB_PROPERTY_SHARD_NUM | 1 | | - | replicas | - | Replicas for property group. |SW_STORAGE_BANYANDB_PROPERTY_REPLICAS | 0 | + +## Standalone Environment Variables +The following environment variables are **not** backed by `application.yml`. They are read directly from the +process environment and take effect across all modules. + +| Environment Variable | Value(s) and Explanation | Default | +|-----------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| +| SW_OAL_ENGINE_DEBUG | Set to any non-empty value to dump OAL-generated `.class` files to disk (under the `oal-rt/` directory relative to the OAP working path). Useful for debugging code generation issues. Leave unset in production. | (not set, no files written) | +| SW_VIRTUAL_THREADS_ENABLED | Set to `false` to disable virtual threads on JDK 25+. On JDK 25+, gRPC server handler threads are virtual threads by default. Set this variable to `false` to force traditional platform thread pools. Ignored on JDK versions below 25. | (not set, virtual threads enabled on JDK 25+) | + ## Note ยน System Environment Variable name could be declared and changed in `application.yml/bydb.yaml`. The names listed here are simply diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index d55dc78853..507c346200 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -249,6 +249,7 @@ public class CoreModuleProvider extends ModuleProvider { if (moduleConfig.getGRPCThreadPoolSize() > 0) { grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize()); } + grpcServer.setThreadPoolName("core-grpc"); grpcServer.initialize(); HTTPServerConfig httpServerConfig = HTTPServerConfig.builder() diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java index 068597b166..3c55cbeb0c 100644 --- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java +++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java @@ -38,6 +38,7 @@ import org.apache.skywalking.oap.server.library.server.Server; import org.apache.skywalking.oap.server.library.server.ServerException; import org.apache.skywalking.oap.server.library.server.grpc.ssl.DynamicSslContext; import org.apache.skywalking.oap.server.library.server.pool.CustomThreadFactory; +import org.apache.skywalking.oap.server.library.util.VirtualThreads; @Slf4j public class GRPCServer implements Server { @@ -53,6 +54,7 @@ public class GRPCServer implements Server { private String trustedCAsFile; private DynamicSslContext sslContext; private int threadPoolSize; + private String threadPoolName = "grpcServerPool"; private static final Marker SERVER_START_MARKER = MarkerFactory.getMarker("Console"); public GRPCServer(String host, int port) { @@ -72,6 +74,10 @@ public class GRPCServer implements Server { this.threadPoolSize = threadPoolSize; } + public void setThreadPoolName(String threadPoolName) { + this.threadPoolName = threadPoolName; + } + /** * Require for `server.crt` and `server.pem` for open ssl at server side. * @@ -96,11 +102,21 @@ public class GRPCServer implements Server { if (maxMessageSize > 0) { nettyServerBuilder.maxInboundMessageSize(maxMessageSize); } - if (threadPoolSize > 0) { - ExecutorService executor = new ThreadPoolExecutor( - threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), - new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler() - ); + final ExecutorService executor = VirtualThreads.createExecutor( + threadPoolName, + () -> { + if (threadPoolSize > 0) { + return new ThreadPoolExecutor( + threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new CustomThreadFactory(threadPoolName), + new CustomRejectedExecutionHandler() + ); + } + return null; + } + ); + if (executor != null) { nettyServerBuilder.executor(executor); } diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreads.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreads.java new file mode 100644 index 0000000000..d796ebec2c --- /dev/null +++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreads.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.util; + +import java.lang.reflect.Method; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; + +/** + * Unified executor factory for both virtual threads (JDK 25+) and platform threads. + * + * <p>Virtual threads (JEP 444) are available since JDK 21, but JDK 21-23 has a critical + * thread pinning bug where {@code synchronized} blocks prevent virtual threads from + * unmounting from carrier threads (see JEP 491). This was fixed in JDK 24, but JDK 24 + * is non-LTS. JDK 25 LTS is the first long-term support release with the fix. + * + * <p>This utility requires <b>JDK 25+</b> to enable virtual threads, ensuring both + * the pinning fix and LTS support are present. All created threads (virtual or platform) + * are named with the provided prefix for monitoring and debugging. + */ +@Slf4j +public final class VirtualThreads { + + /** + * The minimum JDK version required for virtual thread support. + * JDK 25 is the first LTS with the synchronized pinning fix (JEP 491). + */ + static final int MINIMUM_JDK_VERSION = 25; + + private static final boolean SUPPORTED; + + /* + * Cached reflection handles for JDK 25+ virtual thread APIs: + * Thread.ofVirtual() -> Thread.Builder.OfVirtual + * Thread.Builder#name(String prefix, long start) -> Thread.Builder + * Thread.Builder#factory() -> ThreadFactory + * Executors.newThreadPerTaskExecutor(ThreadFactory) -> ExecutorService + */ + private static final Method OF_VIRTUAL; + private static final Method BUILDER_NAME; + private static final Method BUILDER_FACTORY; + private static final Method NEW_THREAD_PER_TASK_EXECUTOR; + + /** + * System environment variable to disable virtual threads on JDK 25+. + * Set {@code SW_VIRTUAL_THREADS_ENABLED=false} to force platform threads. + */ + static final String ENV_VIRTUAL_THREADS_ENABLED = "SW_VIRTUAL_THREADS_ENABLED"; + + static { + final int jdkVersion = Runtime.version().feature(); + boolean supported = false; + Method ofVirtual = null; + Method builderName = null; + Method builderFactory = null; + Method newThreadPerTaskExecutor = null; + + final String envValue = System.getenv(ENV_VIRTUAL_THREADS_ENABLED); + final boolean disabledByEnv = "false".equalsIgnoreCase(envValue); + + if (disabledByEnv) { + log.info("Virtual threads disabled by environment variable {}={}", + ENV_VIRTUAL_THREADS_ENABLED, envValue); + } else if (jdkVersion >= MINIMUM_JDK_VERSION) { + try { + ofVirtual = Thread.class.getMethod("ofVirtual"); + final Class<?> builderClass = Class.forName("java.lang.Thread$Builder"); + builderName = builderClass.getMethod("name", String.class, long.class); + builderFactory = builderClass.getMethod("factory"); + newThreadPerTaskExecutor = Executors.class.getMethod( + "newThreadPerTaskExecutor", ThreadFactory.class); + supported = true; + log.info("Virtual threads available (JDK {})", jdkVersion); + } catch (final ReflectiveOperationException e) { + log.warn("JDK {} meets version requirement but virtual thread API " + + "not found, virtual threads disabled", jdkVersion, e); + } + } else { + log.info("Virtual threads require JDK {}+, current JDK is {}", + MINIMUM_JDK_VERSION, jdkVersion); + } + + SUPPORTED = supported; + OF_VIRTUAL = ofVirtual; + BUILDER_NAME = builderName; + BUILDER_FACTORY = builderFactory; + NEW_THREAD_PER_TASK_EXECUTOR = newThreadPerTaskExecutor; + } + + private VirtualThreads() { + } + + /** + * @return true if the current JDK version is 25+ and virtual thread API is available. + */ + public static boolean isSupported() { + return SUPPORTED; + } + + /** + * Create a named executor service with virtual threads enabled by default. + * On JDK 25+, creates a virtual-thread-per-task executor with threads named + * {@code {namePrefix}-0}, {@code {namePrefix}-1}, etc. + * On older JDKs, delegates to the provided {@code platformExecutorSupplier}. + * + * @param namePrefix prefix for virtual thread names + * @param platformExecutorSupplier supplies the platform-thread executor as fallback + * @return virtual thread executor on JDK 25+, or the supplier's executor otherwise + */ + public static ExecutorService createExecutor(final String namePrefix, + final Supplier<ExecutorService> platformExecutorSupplier) { + return createExecutor(namePrefix, true, platformExecutorSupplier); + } + + /** + * Create a named executor service. When {@code enableVirtualThreads} is true and JDK 25+, + * creates a virtual-thread-per-task executor with threads named + * {@code {namePrefix}-0}, {@code {namePrefix}-1}, etc. + * Otherwise, delegates to the provided {@code platformExecutorSupplier}. + * + * @param namePrefix prefix for virtual thread names + * @param enableVirtualThreads whether to use virtual threads (requires JDK 25+) + * @param platformExecutorSupplier supplies the platform-thread executor as fallback + * @return virtual thread executor or the supplier's executor + */ + public static ExecutorService createExecutor(final String namePrefix, + final boolean enableVirtualThreads, + final Supplier<ExecutorService> platformExecutorSupplier) { + if (enableVirtualThreads && SUPPORTED) { + try { + return createVirtualThreadExecutor(namePrefix); + } catch (final ReflectiveOperationException e) { + log.warn("Failed to create virtual thread executor [{}], " + + "falling back to platform threads", namePrefix, e); + } + } + return platformExecutorSupplier.get(); + } + + private static ExecutorService createVirtualThreadExecutor( + final String namePrefix) throws ReflectiveOperationException { + // Thread.ofVirtual().name("vt:" + namePrefix + "-", 0).factory() + final Object builder = OF_VIRTUAL.invoke(null); + final Object namedBuilder = BUILDER_NAME.invoke(builder, "vt:" + namePrefix + "-", 0L); + final ThreadFactory factory = (ThreadFactory) BUILDER_FACTORY.invoke(namedBuilder); + // Executors.newThreadPerTaskExecutor(factory) + final ExecutorService executor = + (ExecutorService) NEW_THREAD_PER_TASK_EXECUTOR.invoke(null, factory); + log.info("Created virtual-thread-per-task executor [{}]", namePrefix); + return executor; + } +} diff --git a/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/VirtualThreadsTest.java b/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/VirtualThreadsTest.java new file mode 100644 index 0000000000..d445915446 --- /dev/null +++ b/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/VirtualThreadsTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.library.util; + +import java.lang.reflect.Method; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class VirtualThreadsTest { + + @Test + public void testIsSupportedMatchesJdkVersion() { + final int jdkVersion = Runtime.version().feature(); + final boolean expected = jdkVersion >= VirtualThreads.MINIMUM_JDK_VERSION; + assertEquals(expected, VirtualThreads.isSupported()); + } + + @Test + public void testVirtualThreadExecutor() throws Exception { + if (!VirtualThreads.isSupported()) { + return; + } + final ExecutorService executor = VirtualThreads.createExecutor( + "vt-check", true, () -> Executors.newSingleThreadExecutor()); + try { + final ThreadCapture capture = submitAndCapture(executor); + assertTrue(capture.name.startsWith("vt:vt-check-"), + "Virtual thread name should start with 'vt:vt-check-', but was: " + capture.name); + assertTrue(isVirtual(capture.thread), + "Thread should be virtual on JDK 25+"); + } finally { + executor.shutdown(); + } + } + + @Test + public void testForcePlatformFallback() throws Exception { + if (!VirtualThreads.isSupported()) { + return; + } + final AtomicLong counter = new AtomicLong(0); + final ExecutorService executor = VirtualThreads.createExecutor( + "pt-check", false, () -> new ThreadPoolExecutor( + 2, 2, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), + r -> new Thread(r, "pt-check-" + counter.getAndIncrement()) + )); + try { + final ThreadCapture capture = submitAndCapture(executor); + assertTrue(capture.name.startsWith("pt-check-"), + "Platform thread name should start with 'pt-check-', but was: " + capture.name); + assertFalse(isVirtual(capture.thread), + "Thread should NOT be virtual when enableVirtualThreads=false"); + } finally { + executor.shutdown(); + } + } + + @Test + public void testFallbackUsedWhenNotSupported() { + if (VirtualThreads.isSupported()) { + return; + } + final ExecutorService fallback = Executors.newSingleThreadExecutor(); + try { + final ExecutorService result = VirtualThreads.createExecutor("test", () -> fallback); + assertSame(fallback, result); + } finally { + fallback.shutdown(); + } + } + + private ThreadCapture submitAndCapture(final ExecutorService executor) throws InterruptedException { + final AtomicReference<Thread> threadRef = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + executor.submit(() -> { + threadRef.set(Thread.currentThread()); + latch.countDown(); + }); + assertTrue(latch.await(5, TimeUnit.SECONDS), "Task did not complete in time"); + final Thread thread = threadRef.get(); + assertNotNull(thread); + return new ThreadCapture(thread, thread.getName()); + } + + /** + * Check Thread.isVirtual() via reflection (JDK 21+ API, compiled against JDK 11). + */ + private static boolean isVirtual(final Thread thread) throws Exception { + final Method isVirtual = Thread.class.getMethod("isVirtual"); + return (boolean) isVirtual.invoke(thread); + } + + private static class ThreadCapture { + final Thread thread; + final String name; + + ThreadCapture(final Thread thread, final String name) { + this.thread = thread; + this.name = name; + } + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java index ac4bcd5c1e..4f7b54928f 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java @@ -102,6 +102,7 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider { if (config.getGRPCThreadPoolSize() > 0) { grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize()); } + grpcServer.setThreadPoolName("als-grpc"); grpcServer.initialize(); this.receiverGRPCHandlerRegister = new GRPCHandlerRegisterImpl(grpcServer); diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java index 6041ac865b..f48ee9928f 100644 --- a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java @@ -107,6 +107,7 @@ public class EBPFReceiverProvider extends ModuleProvider { if (config.getGRPCThreadPoolSize() > 0) { grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize()); } + grpcServer.setThreadPoolName("ebpf-grpc"); grpcServer.initialize(); this.receiverGRPCHandlerRegister = new GRPCHandlerRegisterImpl(grpcServer); diff --git a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java index 933e095207..a37e8093f0 100644 --- a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java @@ -128,6 +128,7 @@ public class SharingServerModuleProvider extends ModuleProvider { if (config.getGRPCThreadPoolSize() > 0) { grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize()); } + grpcServer.setThreadPoolName("receiver-grpc"); grpcServer.initialize(); GRPCHandlerRegisterImpl grpcHandlerRegister = new GRPCHandlerRegisterImpl(grpcServer);
