This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 757cb6b1aa1 IGNITE-26551 Expand the information in
RejectedExecutionException for pools from ExecutorServiceExtension (#6669)
757cb6b1aa1 is described below
commit 757cb6b1aa16f3ed46b09a366969c1d12a26bc8b
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Sep 30 16:56:04 2025 +0300
IGNITE-26551 Expand the information in RejectedExecutionException for pools
from ExecutorServiceExtension (#6669)
---
.../ExecutorServiceExtensionTest.java | 4 +-
.../testframework/ExecutorServiceExtension.java | 83 +++++++++++++++++++---
2 files changed, 77 insertions(+), 10 deletions(-)
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/testframework/ExecutorServiceExtensionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/testframework/ExecutorServiceExtensionTest.java
index 249127816c4..734ab51894c 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/testframework/ExecutorServiceExtensionTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/testframework/ExecutorServiceExtensionTest.java
@@ -133,12 +133,12 @@ public class ExecutorServiceExtensionTest {
checkExecutorService(
instanceExecutorServiceWithDefaults,
CPUS,
- String.format(DEFAULT_THREAD_PREFIX_FORMAT,
"instanceExecutorServiceWithDefaults")
+ String.format(DEFAULT_THREAD_PREFIX_FORMAT,
"test-instanceExecutorServiceWithDefaults")
);
checkScheduledExecutorService(
instanceScheduledExecutorServiceWithDefaults,
1,
- String.format(DEFAULT_THREAD_PREFIX_FORMAT,
"instanceScheduledExecutorServiceWithDefaults")
+ String.format(DEFAULT_THREAD_PREFIX_FORMAT,
"test-instanceScheduledExecutorServiceWithDefaults")
);
checkExecutorService(
parameterExecutorServiceWithDefaults,
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
index 44595d02317..770977514d4 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
@@ -18,17 +18,22 @@
package org.apache.ignite.internal.testframework;
import static java.lang.reflect.Modifier.isStatic;
-import static java.util.concurrent.Executors.newFixedThreadPool;
-import static java.util.concurrent.Executors.newScheduledThreadPool;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
@@ -133,12 +138,14 @@ public class ExecutorServiceExtension implements
BeforeAllCallback, AfterAllCall
return;
}
+ String methodName =
context.getTestMethod().map(Method::getName).orElse(null);
+
List<ExecutorService> executorServices =
getOrCreateExecutorServiceListInStore(context, forStatic);
for (Field field : fields) {
checkFieldTypeIsSupported(field);
- ExecutorService executorService = createExecutorService(field);
+ ExecutorService executorService = createExecutorService(field,
methodName);
executorServices.add(executorService);
@@ -188,12 +195,12 @@ public class ExecutorServiceExtension implements
BeforeAllCallback, AfterAllCall
return false;
}
- private static ExecutorService createExecutorService(Field field) {
+ private static ExecutorService createExecutorService(Field field,
@Nullable String methodName) {
InjectExecutorService injectExecutorService =
field.getAnnotation(InjectExecutorService.class);
assert injectExecutorService != null : field;
- return createExecutorService(injectExecutorService, field.getName(),
field.getType(), field.getDeclaringClass(), null);
+ return createExecutorService(injectExecutorService, field.getName(),
field.getType(), field.getDeclaringClass(), methodName);
}
private static ExecutorService createExecutorService(
@@ -211,12 +218,17 @@ public class ExecutorServiceExtension implements
BeforeAllCallback, AfterAllCall
threadPrefix,
false,
Loggers.forClass(testClass),
- allowedOperations);
+ allowedOperations
+ );
+
+ var shutdownExceptionHolder = new AtomicReference<Exception>();
+
+ RejectedExecutionHandler rejectedExecutionHandler =
newRejectedExecutionHandler(threadPrefix, shutdownExceptionHolder);
if (fieldType.equals(ScheduledExecutorService.class)) {
- return newScheduledThreadPool(threadCount == 0 ? 1 : threadCount,
threadFactory);
+ return newScheduledThreadPool(threadCount, threadFactory,
rejectedExecutionHandler, shutdownExceptionHolder);
} else if (fieldType.equals(ExecutorService.class)) {
- return newFixedThreadPool(threadCount == 0 ? CPUS : threadCount,
threadFactory);
+ return newFixedThreadPool(threadCount, threadFactory,
rejectedExecutionHandler, shutdownExceptionHolder);
}
throw new AssertionError(
@@ -258,4 +270,59 @@ public class ExecutorServiceExtension implements
BeforeAllCallback, AfterAllCall
return executorServices;
}
+
+ private static RejectedExecutionHandler newRejectedExecutionHandler(
+ String threadPrefix,
+ AtomicReference<Exception> shutdownExceptionHolder
+ ) {
+ return (r, executor) -> {
+ String message = "Task " + r.toString()
+ + " for threads with prefix " + threadPrefix
+ + " rejected from " + executor.toString();
+
+ throw new RejectedExecutionException(message,
shutdownExceptionHolder.get());
+ };
+ }
+
+ private static ExecutorService newFixedThreadPool(
+ int threadCount,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler rejectedExecutionHandler,
+ AtomicReference<Exception> shutdownExceptionHolder
+ ) {
+ int poolSize = threadCount == 0 ? CPUS : threadCount;
+
+ return new ThreadPoolExecutor(
+ poolSize,
+ poolSize,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ threadFactory,
+ rejectedExecutionHandler
+ ) {
+ @Override
+ public void shutdown() {
+ shutdownExceptionHolder.compareAndSet(null, new
Exception("Shutdown tracker"));
+
+ super.shutdown();
+ }
+ };
+ }
+
+ private static ScheduledExecutorService newScheduledThreadPool(
+ int threadCount,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler rejectedExecutionHandler,
+ AtomicReference<Exception> shutdownExceptionHolder
+ ) {
+ return new ScheduledThreadPoolExecutor(threadCount == 0 ? 1 :
threadCount, threadFactory, rejectedExecutionHandler) {
+ @Override
+ public void shutdown() {
+ shutdownExceptionHolder.compareAndSet(null, new
Exception("Shutdown tracker"));
+
+ super.shutdown();
+ }
+ };
+ }
}