This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26551
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit ab6684636d9888aae559c6d6ce4041fc25ccd515
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Sep 30 11:32:50 2025 +0300

    IGNITE-26551 wip
---
 .../testframework/ExecutorServiceExtension.java    | 83 +++++++++++++++++++---
 1 file changed, 75 insertions(+), 8 deletions(-)

diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
index 44595d02317..5a01ddc6e11 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
@@ -18,17 +18,22 @@
 package org.apache.ignite.internal.testframework;
 
 import static java.lang.reflect.Modifier.isStatic;
-import static java.util.concurrent.Executors.newFixedThreadPool;
-import static java.util.concurrent.Executors.newScheduledThreadPool;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
@@ -133,12 +138,14 @@ public class ExecutorServiceExtension implements 
BeforeAllCallback, AfterAllCall
             return;
         }
 
+        String methodName = 
context.getTestMethod().map(Method::getName).orElse(null);
+
         List<ExecutorService> executorServices = 
getOrCreateExecutorServiceListInStore(context, forStatic);
 
         for (Field field : fields) {
             checkFieldTypeIsSupported(field);
 
-            ExecutorService executorService = createExecutorService(field);
+            ExecutorService executorService = createExecutorService(field, 
methodName);
 
             executorServices.add(executorService);
 
@@ -188,12 +195,12 @@ public class ExecutorServiceExtension implements 
BeforeAllCallback, AfterAllCall
         return false;
     }
 
-    private static ExecutorService createExecutorService(Field field) {
+    private static ExecutorService createExecutorService(Field field, 
@Nullable String methodName) {
         InjectExecutorService injectExecutorService = 
field.getAnnotation(InjectExecutorService.class);
 
         assert injectExecutorService != null : field;
 
-        return createExecutorService(injectExecutorService, field.getName(), 
field.getType(), field.getDeclaringClass(), null);
+        return createExecutorService(injectExecutorService, field.getName(), 
field.getType(), field.getDeclaringClass(), methodName);
     }
 
     private static ExecutorService createExecutorService(
@@ -211,12 +218,17 @@ public class ExecutorServiceExtension implements 
BeforeAllCallback, AfterAllCall
                 threadPrefix,
                 false,
                 Loggers.forClass(testClass),
-                allowedOperations);
+                allowedOperations
+        );
+
+        var shutdownStackTraceHolder = new AtomicReference<Exception>();
+
+        RejectedExecutionHandler rejectedExecutionHandler = 
newRejectedExecutionHandler(threadPrefix, shutdownStackTraceHolder);
 
         if (fieldType.equals(ScheduledExecutorService.class)) {
-            return newScheduledThreadPool(threadCount == 0 ? 1 : threadCount, 
threadFactory);
+            return newScheduledThreadPool(threadCount, threadFactory, 
rejectedExecutionHandler, shutdownStackTraceHolder);
         } else if (fieldType.equals(ExecutorService.class)) {
-            return newFixedThreadPool(threadCount == 0 ? CPUS : threadCount, 
threadFactory);
+            return newFixedThreadPool(threadCount, threadFactory, 
rejectedExecutionHandler, shutdownStackTraceHolder);
         }
 
         throw new AssertionError(
@@ -258,4 +270,59 @@ public class ExecutorServiceExtension implements 
BeforeAllCallback, AfterAllCall
 
         return executorServices;
     }
+
+    private static RejectedExecutionHandler newRejectedExecutionHandler(
+            String threadPrefix,
+            AtomicReference<Exception> shutdownStackTraceHolder
+    ) {
+        return (r, executor) -> {
+            String message = "Task " + r.toString()
+                    + " for threads with prefix " + threadPrefix
+                    + " rejected from " + executor.toString();
+
+            throw new RejectedExecutionException(message, 
shutdownStackTraceHolder.get());
+        };
+    }
+
+    private static ExecutorService newFixedThreadPool(
+            int threadCount,
+            ThreadFactory threadFactory,
+            RejectedExecutionHandler rejectedExecutionHandler,
+            AtomicReference<Exception> shutdownStackTraceHolder
+    ) {
+        int poolSize = threadCount == 0 ? CPUS : threadCount;
+
+        return new ThreadPoolExecutor(
+                poolSize,
+                poolSize,
+                0L,
+                TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<>(),
+                threadFactory,
+                rejectedExecutionHandler
+        ) {
+            @Override
+            public void shutdown() {
+                shutdownStackTraceHolder.compareAndSet(null, new Exception());
+
+                super.shutdown();
+            }
+        };
+    }
+
+    private static ScheduledExecutorService newScheduledThreadPool(
+            int threadCount,
+            ThreadFactory threadFactory,
+            RejectedExecutionHandler rejectedExecutionHandler,
+            AtomicReference<Exception> shutdownStackTraceHolder
+    ) {
+        return new ScheduledThreadPoolExecutor(threadCount == 0 ? 1 : 
threadCount, threadFactory, rejectedExecutionHandler) {
+            @Override
+            public void shutdown() {
+                shutdownStackTraceHolder.compareAndSet(null, new Exception());
+
+                super.shutdown();
+            }
+        };
+    }
 }

Reply via email to