Copilot commented on code in PR #13705: URL: https://github.com/apache/skywalking/pull/13705#discussion_r2811623387
########## oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreadScheduledExecutor.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.util; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; + +/** + * A {@link ScheduledExecutorService} fully backed by virtual threads. + * + * <p>All methods — including {@code schedule()}, {@code scheduleAtFixedRate()}, + * and {@code scheduleWithFixedDelay()} — delegate to virtual threads. + * Scheduling is implemented by sleeping in a virtual thread (which does not + * block OS threads), eliminating the need for a platform timer thread. + * + * <p>This adapter bridges the gap between virtual thread executors (which return + * {@link ExecutorService}) and frameworks like Armeria that require a + * {@link ScheduledExecutorService} for their blocking task executor. + */ +@Slf4j +final class VirtualThreadScheduledExecutor implements ScheduledExecutorService { + + private final ExecutorService vtExecutor; + + VirtualThreadScheduledExecutor(final ExecutorService vtExecutor) { + this.vtExecutor = vtExecutor; + } + + // --- Core execution: delegate to virtual threads --- + + @Override + public void execute(final Runnable command) { + vtExecutor.execute(command); + } + + @Override + public Future<?> submit(final Runnable task) { + return vtExecutor.submit(task); + } + + @Override + public <T> Future<T> submit(final Runnable task, final T result) { + return vtExecutor.submit(task, result); + } + + @Override + public <T> Future<T> submit(final Callable<T> task) { + return vtExecutor.submit(task); + } + + @Override + public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) + throws InterruptedException { + return vtExecutor.invokeAll(tasks); + } + + @Override + public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, + final long timeout, final TimeUnit unit) + throws InterruptedException { + return vtExecutor.invokeAll(tasks, timeout, unit); + } + + @Override + public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + return vtExecutor.invokeAny(tasks); + } + + @Override + public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, + final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return vtExecutor.invokeAny(tasks, timeout, unit); + } + + // --- Scheduling: sleep in virtual thread, then execute --- + + @Override + public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) { + final long triggerNanos = System.nanoTime() + unit.toNanos(delay); + final VirtualScheduledFuture<Void> sf = new VirtualScheduledFuture<>(triggerNanos); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(triggerNanos); + command.run(); + return null; + })); + return sf; + } + + @Override + public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, + final TimeUnit unit) { + final long triggerNanos = System.nanoTime() + unit.toNanos(delay); + final VirtualScheduledFuture<V> sf = new VirtualScheduledFuture<>(triggerNanos); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(triggerNanos); + return callable.call(); + })); + return sf; + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, + final long period, final TimeUnit unit) { + final long periodNanos = unit.toNanos(period); + final long firstTrigger = System.nanoTime() + unit.toNanos(initialDelay); + final VirtualScheduledFuture<Void> sf = new VirtualScheduledFuture<>(firstTrigger); + sf.setFuture(vtExecutor.submit(() -> { + long nextTrigger = firstTrigger; + sleepUntil(nextTrigger); + while (!Thread.currentThread().isInterrupted()) { + command.run(); + nextTrigger += periodNanos; + sf.updateTriggerNanos(nextTrigger); + sleepUntil(nextTrigger); + } + return null; + })); + return sf; + } Review Comment: The scheduleAtFixedRate() method doesn't handle exceptions thrown by the command.run() call. If the command throws an exception, it will propagate up and terminate the scheduling loop without proper logging or notification. This differs from standard ScheduledExecutorService implementations which typically log the exception and continue scheduling. Consider wrapping command.run() in a try-catch block to handle exceptions gracefully and continue the scheduling loop. ########## .github/workflows/publish-docker.yaml: ########## @@ -75,6 +75,11 @@ jobs: uses: docker/setup-qemu-action@v3 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 + - name: Build and push docker images based on Java 11 + env: + SW_OAP_BASE_IMAGE: eclipse-temurin:11-jre + TAG: ${{ env.TAG }}-java11 + run: make build.all docker.push Review Comment: The docker workflow reorders the Java 11 build step before Java 17 and Java 21, and removes the explicit Java 25 build (which is now the default). While functionally this should work, the conventional ordering would be to keep versions in ascending order (11, 17, 21) followed by the default. Consider reordering lines 78-82 to come after the Java 21 step (lines 88-92) to maintain chronological version ordering for better maintainability. ########## oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreadScheduledExecutor.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.util; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; + +/** + * A {@link ScheduledExecutorService} fully backed by virtual threads. + * + * <p>All methods — including {@code schedule()}, {@code scheduleAtFixedRate()}, + * and {@code scheduleWithFixedDelay()} — delegate to virtual threads. + * Scheduling is implemented by sleeping in a virtual thread (which does not + * block OS threads), eliminating the need for a platform timer thread. + * + * <p>This adapter bridges the gap between virtual thread executors (which return + * {@link ExecutorService}) and frameworks like Armeria that require a + * {@link ScheduledExecutorService} for their blocking task executor. + */ +@Slf4j +final class VirtualThreadScheduledExecutor implements ScheduledExecutorService { + + private final ExecutorService vtExecutor; + + VirtualThreadScheduledExecutor(final ExecutorService vtExecutor) { + this.vtExecutor = vtExecutor; + } + + // --- Core execution: delegate to virtual threads --- + + @Override + public void execute(final Runnable command) { + vtExecutor.execute(command); + } + + @Override + public Future<?> submit(final Runnable task) { + return vtExecutor.submit(task); + } + + @Override + public <T> Future<T> submit(final Runnable task, final T result) { + return vtExecutor.submit(task, result); + } + + @Override + public <T> Future<T> submit(final Callable<T> task) { + return vtExecutor.submit(task); + } + + @Override + public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) + throws InterruptedException { + return vtExecutor.invokeAll(tasks); + } + + @Override + public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, + final long timeout, final TimeUnit unit) + throws InterruptedException { + return vtExecutor.invokeAll(tasks, timeout, unit); + } + + @Override + public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + return vtExecutor.invokeAny(tasks); + } + + @Override + public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, + final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return vtExecutor.invokeAny(tasks, timeout, unit); + } + + // --- Scheduling: sleep in virtual thread, then execute --- + + @Override + public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) { + final long triggerNanos = System.nanoTime() + unit.toNanos(delay); + final VirtualScheduledFuture<Void> sf = new VirtualScheduledFuture<>(triggerNanos); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(triggerNanos); + command.run(); + return null; + })); + return sf; + } + + @Override + public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, + final TimeUnit unit) { + final long triggerNanos = System.nanoTime() + unit.toNanos(delay); + final VirtualScheduledFuture<V> sf = new VirtualScheduledFuture<>(triggerNanos); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(triggerNanos); + return callable.call(); + })); + return sf; + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, + final long period, final TimeUnit unit) { + final long periodNanos = unit.toNanos(period); + final long firstTrigger = System.nanoTime() + unit.toNanos(initialDelay); + final VirtualScheduledFuture<Void> sf = new VirtualScheduledFuture<>(firstTrigger); + sf.setFuture(vtExecutor.submit(() -> { + long nextTrigger = firstTrigger; + sleepUntil(nextTrigger); + while (!Thread.currentThread().isInterrupted()) { + command.run(); + nextTrigger += periodNanos; + sf.updateTriggerNanos(nextTrigger); + sleepUntil(nextTrigger); + } + return null; + })); + return sf; + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, + final long delay, final TimeUnit unit) { + final long delayNanos = unit.toNanos(delay); + final long firstTrigger = System.nanoTime() + unit.toNanos(initialDelay); + final VirtualScheduledFuture<Void> sf = new VirtualScheduledFuture<>(firstTrigger); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(firstTrigger); + while (!Thread.currentThread().isInterrupted()) { + command.run(); + final long nextTrigger = System.nanoTime() + delayNanos; + sf.updateTriggerNanos(nextTrigger); + sleepUntil(nextTrigger); + } + return null; + })); + return sf; + } Review Comment: The scheduleWithFixedDelay() method doesn't handle exceptions thrown by the command.run() call. If the command throws an exception, it will propagate up and terminate the scheduling loop without proper logging or notification. This differs from standard ScheduledExecutorService implementations which typically log the exception and continue scheduling. Consider wrapping command.run() in a try-catch block to handle exceptions gracefully and continue the scheduling loop. ########## docs/en/changes/changes.md: ########## @@ -12,6 +12,20 @@ * Add `CLAUDE.md` as AI assistant guide for the project. * Upgrade Groovy to 5.0.3 in OAP backend. * Bump up nodejs to v24.13.0 for the latest UI(booster-ui) compiling. +* Add virtual thread support (JDK 25+) for gRPC and Armeria HTTP server handler threads. + Set `SW_VIRTUAL_THREADS_ENABLED=false` to disable. + + | Pool | Threads (JDK < 25) | Threads (JDK 25+) | + |---|---|---| + | gRPC server handler (`core-grpc`) | Cached platform (unbounded) | Virtual threads | + | HTTP blocking (`core-http`) | Cached platform (max 200) | Virtual threads | + | HTTP blocking (`firehose-http`) | Cached platform (max 200) | Virtual threads | + | HTTP blocking (`logql-http`) | Cached platform (max 200) | Virtual threads | + | HTTP blocking (`promql-http`) | Cached platform (max 200) | Virtual threads | + | VT carrier threads (ForkJoinPool) | N/A | 9 shared | + + On JDK 25+, all 5 pools share 9 carrier threads instead of up to 800+ platform threads. Review Comment: The changelog mentions "all 5 pools share 9 carrier threads" but only lists 4 HTTP pools (core-http, firehose-http, logql-http, promql-http) plus 1 gRPC pool, which totals 5 pools. However, the table should also include the other HTTP servers that are updated in this PR: zipkin-http, receiver-http, and zipkin-query-http. Please update the table and the count to accurately reflect all affected pools, or clarify which pools are being counted in the "all 5 pools" statement. ```suggestion | HTTP blocking (`zipkin-http`) | Cached platform (max 200) | Virtual threads | | HTTP blocking (`receiver-http`) | Cached platform (max 200) | Virtual threads | | HTTP blocking (`zipkin-query-http`) | Cached platform (max 200) | Virtual threads | | VT carrier threads (ForkJoinPool) | N/A | 9 shared | On JDK 25+, all 8 pools above share 9 carrier threads instead of up to 1,400+ platform threads in the HTTP pools. ``` ########## docs/en/setup/backend/configuration-vocabulary.md: ########## @@ -542,6 +542,16 @@ OAP will query the data from the "hot and warm" stage by default if the "warm" s | property | - | - | The group settings of property, such as UI and profiling. | - | - | | - | shardNum | - | Shards Number for property group. | SW_STORAGE_BANYANDB_PROPERTY_SHARD_NUM | 1 | | - | replicas | - | Replicas for property group. |SW_STORAGE_BANYANDB_PROPERTY_REPLICAS | 0 | + +## Standalone Environment Variables +The following environment variables are **not** backed by `application.yml`. They are read directly from the +process environment and take effect across all modules. + +| Environment Variable | Value(s) and Explanation | Default | +|-----------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| +| SW_OAL_ENGINE_DEBUG | Set to any non-empty value to dump OAL-generated `.class` files to disk (under the `oal-rt/` directory relative to the OAP working path). Useful for debugging code generation issues. Leave unset in production. | (not set, no files written) | +| SW_VIRTUAL_THREADS_ENABLED | Set to `false` to disable virtual threads on JDK 25+. On JDK 25+, gRPC server handler threads are virtual threads by default. Set this variable to `false` to force traditional platform thread pools. Ignored on JDK versions below 25. | (not set, virtual threads enabled on JDK 25+) | Review Comment: The documentation for SW_VIRTUAL_THREADS_ENABLED only mentions gRPC server handler threads but the feature also affects HTTP blocking task executors for Armeria servers (core-http, firehose-http, logql-http, promql-http, zipkin-http, etc.). Please update the description to mention both gRPC and HTTP server handler threads to accurately reflect the scope of this environment variable. ```suggestion | SW_VIRTUAL_THREADS_ENABLED | Set to `false` to disable virtual threads on JDK 25+. On JDK 25+, gRPC server handler threads and HTTP blocking task executors for Armeria servers (core-http, firehose-http, logql-http, promql-http, zipkin-http, etc.) are virtual threads by default. Set this variable to `false` to force traditional platform thread pools. Ignored on JDK versions below 25. | (not set, virtual threads enabled on JDK 25+) | ``` ########## oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreadScheduledExecutor.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.util; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; + +/** + * A {@link ScheduledExecutorService} fully backed by virtual threads. + * + * <p>All methods — including {@code schedule()}, {@code scheduleAtFixedRate()}, + * and {@code scheduleWithFixedDelay()} — delegate to virtual threads. + * Scheduling is implemented by sleeping in a virtual thread (which does not + * block OS threads), eliminating the need for a platform timer thread. + * + * <p>This adapter bridges the gap between virtual thread executors (which return + * {@link ExecutorService}) and frameworks like Armeria that require a + * {@link ScheduledExecutorService} for their blocking task executor. + */ +@Slf4j +final class VirtualThreadScheduledExecutor implements ScheduledExecutorService { + + private final ExecutorService vtExecutor; + + VirtualThreadScheduledExecutor(final ExecutorService vtExecutor) { + this.vtExecutor = vtExecutor; + } + + // --- Core execution: delegate to virtual threads --- + + @Override + public void execute(final Runnable command) { + vtExecutor.execute(command); + } + + @Override + public Future<?> submit(final Runnable task) { + return vtExecutor.submit(task); + } + + @Override + public <T> Future<T> submit(final Runnable task, final T result) { + return vtExecutor.submit(task, result); + } + + @Override + public <T> Future<T> submit(final Callable<T> task) { + return vtExecutor.submit(task); + } + + @Override + public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) + throws InterruptedException { + return vtExecutor.invokeAll(tasks); + } + + @Override + public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, + final long timeout, final TimeUnit unit) + throws InterruptedException { + return vtExecutor.invokeAll(tasks, timeout, unit); + } + + @Override + public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + return vtExecutor.invokeAny(tasks); + } + + @Override + public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, + final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return vtExecutor.invokeAny(tasks, timeout, unit); + } + + // --- Scheduling: sleep in virtual thread, then execute --- + + @Override + public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) { + final long triggerNanos = System.nanoTime() + unit.toNanos(delay); + final VirtualScheduledFuture<Void> sf = new VirtualScheduledFuture<>(triggerNanos); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(triggerNanos); + command.run(); + return null; + })); + return sf; + } + + @Override + public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, + final TimeUnit unit) { + final long triggerNanos = System.nanoTime() + unit.toNanos(delay); + final VirtualScheduledFuture<V> sf = new VirtualScheduledFuture<>(triggerNanos); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(triggerNanos); + return callable.call(); + })); + return sf; + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, + final long period, final TimeUnit unit) { + final long periodNanos = unit.toNanos(period); + final long firstTrigger = System.nanoTime() + unit.toNanos(initialDelay); + final VirtualScheduledFuture<Void> sf = new VirtualScheduledFuture<>(firstTrigger); + sf.setFuture(vtExecutor.submit(() -> { + long nextTrigger = firstTrigger; + sleepUntil(nextTrigger); + while (!Thread.currentThread().isInterrupted()) { + command.run(); + nextTrigger += periodNanos; + sf.updateTriggerNanos(nextTrigger); + sleepUntil(nextTrigger); + } + return null; + })); + return sf; + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, + final long delay, final TimeUnit unit) { + final long delayNanos = unit.toNanos(delay); + final long firstTrigger = System.nanoTime() + unit.toNanos(initialDelay); + final VirtualScheduledFuture<Void> sf = new VirtualScheduledFuture<>(firstTrigger); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(firstTrigger); + while (!Thread.currentThread().isInterrupted()) { + command.run(); + final long nextTrigger = System.nanoTime() + delayNanos; + sf.updateTriggerNanos(nextTrigger); + sleepUntil(nextTrigger); + } + return null; + })); + return sf; + } + + private static void sleepUntil(final long triggerNanos) throws InterruptedException { + long remaining = triggerNanos - System.nanoTime(); + while (remaining > 0) { + TimeUnit.NANOSECONDS.sleep(remaining); + remaining = triggerNanos - System.nanoTime(); + } + } + + // --- Lifecycle --- + + @Override + public void shutdown() { + vtExecutor.shutdown(); + } + + @Override + public List<Runnable> shutdownNow() { + return vtExecutor.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return vtExecutor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return vtExecutor.isTerminated(); + } + + @Override + public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { + return vtExecutor.awaitTermination(timeout, unit); + } + + /** + * A {@link ScheduledFuture} backed by a virtual thread {@link Future}. + */ + static final class VirtualScheduledFuture<V> implements ScheduledFuture<V> { + private volatile Future<V> delegate; + private volatile long triggerNanos; + + VirtualScheduledFuture(final long triggerNanos) { + this.triggerNanos = triggerNanos; + } + + void setFuture(final Future<V> delegate) { + this.delegate = delegate; + } + + void updateTriggerNanos(final long triggerNanos) { + this.triggerNanos = triggerNanos; + } + + @Override + public long getDelay(final TimeUnit unit) { + return unit.convert(triggerNanos - System.nanoTime(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(final Delayed other) { + if (other == this) { + return 0; + } + return Long.compare(getDelay(TimeUnit.NANOSECONDS), other.getDelay(TimeUnit.NANOSECONDS)); + } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + return delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return delegate.get(); + } + + @Override + public V get(final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.get(timeout, unit); + } Review Comment: The VirtualScheduledFuture class has a potential race condition where methods like cancel(), isCancelled(), isDone(), and get() could be called before setFuture() is invoked, resulting in a NullPointerException when accessing the delegate field. While the schedule() methods call setFuture() immediately after creating the VirtualScheduledFuture, there's no happens-before guarantee that prevents another thread from calling these methods on the returned ScheduledFuture before setFuture() completes. Consider adding null checks or using a CountDownLatch/volatile initialization flag to ensure the delegate is set before any methods can access it. ########## oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreadScheduledExecutor.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.util; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; + +/** + * A {@link ScheduledExecutorService} fully backed by virtual threads. + * + * <p>All methods — including {@code schedule()}, {@code scheduleAtFixedRate()}, + * and {@code scheduleWithFixedDelay()} — delegate to virtual threads. + * Scheduling is implemented by sleeping in a virtual thread (which does not + * block OS threads), eliminating the need for a platform timer thread. + * + * <p>This adapter bridges the gap between virtual thread executors (which return + * {@link ExecutorService}) and frameworks like Armeria that require a + * {@link ScheduledExecutorService} for their blocking task executor. + */ +@Slf4j +final class VirtualThreadScheduledExecutor implements ScheduledExecutorService { + + private final ExecutorService vtExecutor; + + VirtualThreadScheduledExecutor(final ExecutorService vtExecutor) { + this.vtExecutor = vtExecutor; + } + + // --- Core execution: delegate to virtual threads --- + + @Override + public void execute(final Runnable command) { + vtExecutor.execute(command); + } + + @Override + public Future<?> submit(final Runnable task) { + return vtExecutor.submit(task); + } + + @Override + public <T> Future<T> submit(final Runnable task, final T result) { + return vtExecutor.submit(task, result); + } + + @Override + public <T> Future<T> submit(final Callable<T> task) { + return vtExecutor.submit(task); + } + + @Override + public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) + throws InterruptedException { + return vtExecutor.invokeAll(tasks); + } + + @Override + public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, + final long timeout, final TimeUnit unit) + throws InterruptedException { + return vtExecutor.invokeAll(tasks, timeout, unit); + } + + @Override + public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + return vtExecutor.invokeAny(tasks); + } + + @Override + public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, + final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return vtExecutor.invokeAny(tasks, timeout, unit); + } + + // --- Scheduling: sleep in virtual thread, then execute --- + + @Override + public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) { + final long triggerNanos = System.nanoTime() + unit.toNanos(delay); + final VirtualScheduledFuture<Void> sf = new VirtualScheduledFuture<>(triggerNanos); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(triggerNanos); + command.run(); + return null; + })); + return sf; + } + + @Override + public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, + final TimeUnit unit) { + final long triggerNanos = System.nanoTime() + unit.toNanos(delay); + final VirtualScheduledFuture<V> sf = new VirtualScheduledFuture<>(triggerNanos); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(triggerNanos); + return callable.call(); + })); + return sf; + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, + final long period, final TimeUnit unit) { + final long periodNanos = unit.toNanos(period); + final long firstTrigger = System.nanoTime() + unit.toNanos(initialDelay); + final VirtualScheduledFuture<Void> sf = new VirtualScheduledFuture<>(firstTrigger); + sf.setFuture(vtExecutor.submit(() -> { + long nextTrigger = firstTrigger; + sleepUntil(nextTrigger); + while (!Thread.currentThread().isInterrupted()) { + command.run(); + nextTrigger += periodNanos; + sf.updateTriggerNanos(nextTrigger); + sleepUntil(nextTrigger); + } + return null; + })); + return sf; + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, + final long delay, final TimeUnit unit) { + final long delayNanos = unit.toNanos(delay); + final long firstTrigger = System.nanoTime() + unit.toNanos(initialDelay); + final VirtualScheduledFuture<Void> sf = new VirtualScheduledFuture<>(firstTrigger); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(firstTrigger); + while (!Thread.currentThread().isInterrupted()) { + command.run(); + final long nextTrigger = System.nanoTime() + delayNanos; + sf.updateTriggerNanos(nextTrigger); + sleepUntil(nextTrigger); + } + return null; + })); + return sf; + } + + private static void sleepUntil(final long triggerNanos) throws InterruptedException { + long remaining = triggerNanos - System.nanoTime(); + while (remaining > 0) { + TimeUnit.NANOSECONDS.sleep(remaining); Review Comment: The sleepUntil() method uses a spin-loop with TimeUnit.NANOSECONDS.sleep(remaining) which could be inefficient if the sleep is interrupted frequently or if System.nanoTime() drifts. While this is acceptable for virtual threads (which are cheap), consider adding a minimum sleep threshold (e.g., 1ms) to avoid excessive loop iterations for very short remaining times, or use Thread.sleep(0) to yield if remaining is less than 1ms. This would reduce CPU usage from clock checks in tight loops. ```suggestion private static final long MIN_SLEEP_NANOS = TimeUnit.MILLISECONDS.toNanos(1); private static void sleepUntil(final long triggerNanos) throws InterruptedException { long remaining = triggerNanos - System.nanoTime(); while (remaining > 0) { final long sleepNanos = remaining >= MIN_SLEEP_NANOS ? remaining : 0L; if (sleepNanos > 0L) { TimeUnit.NANOSECONDS.sleep(sleepNanos); } else { // For very short remaining times, yield the virtual thread to avoid a tight loop. Thread.sleep(0); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
