This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26551 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit ab6684636d9888aae559c6d6ce4041fc25ccd515 Author: Kirill Tkalenko <[email protected]> AuthorDate: Tue Sep 30 11:32:50 2025 +0300 IGNITE-26551 wip --- .../testframework/ExecutorServiceExtension.java | 83 +++++++++++++++++++--- 1 file changed, 75 insertions(+), 8 deletions(-) diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java index 44595d02317..5a01ddc6e11 100644 --- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java @@ -18,17 +18,22 @@ package org.apache.ignite.internal.testframework; import static java.lang.reflect.Modifier.isStatic; -import static java.util.concurrent.Executors.newFixedThreadPool; -import static java.util.concurrent.Executors.newScheduledThreadPool; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.thread.IgniteThreadFactory; @@ -133,12 +138,14 @@ public class ExecutorServiceExtension implements BeforeAllCallback, AfterAllCall return; } + String methodName = context.getTestMethod().map(Method::getName).orElse(null); + List<ExecutorService> executorServices = getOrCreateExecutorServiceListInStore(context, forStatic); for (Field field : fields) { checkFieldTypeIsSupported(field); - ExecutorService executorService = createExecutorService(field); + ExecutorService executorService = createExecutorService(field, methodName); executorServices.add(executorService); @@ -188,12 +195,12 @@ public class ExecutorServiceExtension implements BeforeAllCallback, AfterAllCall return false; } - private static ExecutorService createExecutorService(Field field) { + private static ExecutorService createExecutorService(Field field, @Nullable String methodName) { InjectExecutorService injectExecutorService = field.getAnnotation(InjectExecutorService.class); assert injectExecutorService != null : field; - return createExecutorService(injectExecutorService, field.getName(), field.getType(), field.getDeclaringClass(), null); + return createExecutorService(injectExecutorService, field.getName(), field.getType(), field.getDeclaringClass(), methodName); } private static ExecutorService createExecutorService( @@ -211,12 +218,17 @@ public class ExecutorServiceExtension implements BeforeAllCallback, AfterAllCall threadPrefix, false, Loggers.forClass(testClass), - allowedOperations); + allowedOperations + ); + + var shutdownStackTraceHolder = new AtomicReference<Exception>(); + + RejectedExecutionHandler rejectedExecutionHandler = newRejectedExecutionHandler(threadPrefix, shutdownStackTraceHolder); if (fieldType.equals(ScheduledExecutorService.class)) { - return newScheduledThreadPool(threadCount == 0 ? 1 : threadCount, threadFactory); + return newScheduledThreadPool(threadCount, threadFactory, rejectedExecutionHandler, shutdownStackTraceHolder); } else if (fieldType.equals(ExecutorService.class)) { - return newFixedThreadPool(threadCount == 0 ? CPUS : threadCount, threadFactory); + return newFixedThreadPool(threadCount, threadFactory, rejectedExecutionHandler, shutdownStackTraceHolder); } throw new AssertionError( @@ -258,4 +270,59 @@ public class ExecutorServiceExtension implements BeforeAllCallback, AfterAllCall return executorServices; } + + private static RejectedExecutionHandler newRejectedExecutionHandler( + String threadPrefix, + AtomicReference<Exception> shutdownStackTraceHolder + ) { + return (r, executor) -> { + String message = "Task " + r.toString() + + " for threads with prefix " + threadPrefix + + " rejected from " + executor.toString(); + + throw new RejectedExecutionException(message, shutdownStackTraceHolder.get()); + }; + } + + private static ExecutorService newFixedThreadPool( + int threadCount, + ThreadFactory threadFactory, + RejectedExecutionHandler rejectedExecutionHandler, + AtomicReference<Exception> shutdownStackTraceHolder + ) { + int poolSize = threadCount == 0 ? CPUS : threadCount; + + return new ThreadPoolExecutor( + poolSize, + poolSize, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + threadFactory, + rejectedExecutionHandler + ) { + @Override + public void shutdown() { + shutdownStackTraceHolder.compareAndSet(null, new Exception()); + + super.shutdown(); + } + }; + } + + private static ScheduledExecutorService newScheduledThreadPool( + int threadCount, + ThreadFactory threadFactory, + RejectedExecutionHandler rejectedExecutionHandler, + AtomicReference<Exception> shutdownStackTraceHolder + ) { + return new ScheduledThreadPoolExecutor(threadCount == 0 ? 1 : threadCount, threadFactory, rejectedExecutionHandler) { + @Override + public void shutdown() { + shutdownStackTraceHolder.compareAndSet(null, new Exception()); + + super.shutdown(); + } + }; + } }
