This is an automated email from the ASF dual-hosted git repository.
ilgrosso pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/syncope.git
The following commit(s) were added to refs/heads/master by this push:
new 7572b0c6b0 VirtualThreadPoolTaskExecutor improvements
7572b0c6b0 is described below
commit 7572b0c6b05896633d755780a920682a16a5e93f
Author: Francesco Chicchiriccò <[email protected]>
AuthorDate: Thu Jul 3 15:47:32 2025 +0200
VirtualThreadPoolTaskExecutor improvements
---
.../core/rest/cxf/IdRepoRESTCXFContext.java | 5 +-
.../core/rest/cxf/service/SyncopeServiceImpl.java | 6 +-
.../provisioning/java/ProvisioningContext.java | 8 +--
.../provisioning/java/job/MacroJobDelegate.java | 4 +-
.../PriorityPropagationTaskExecutor.java | 6 +-
.../spring/task/VirtualThreadPoolTaskExecutor.java | 66 ++++++++++++++++++++--
6 files changed, 75 insertions(+), 20 deletions(-)
diff --git
a/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/IdRepoRESTCXFContext.java
b/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/IdRepoRESTCXFContext.java
index 530becbf10..231a64f2a4 100644
---
a/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/IdRepoRESTCXFContext.java
+++
b/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/IdRepoRESTCXFContext.java
@@ -136,6 +136,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
+import org.springframework.core.task.AsyncTaskExecutor;
@PropertySource("classpath:errorMessages.properties")
@EnableConfigurationProperties(RESTProperties.class)
@@ -145,7 +146,7 @@ public class IdRepoRESTCXFContext {
private static final Logger LOG =
LoggerFactory.getLogger(IdRepoRESTCXFContext.class);
@Bean
- public VirtualThreadPoolTaskExecutor batchExecutor(final RESTProperties
props) {
+ public AsyncTaskExecutor batchExecutor(final RESTProperties props) {
VirtualThreadPoolTaskExecutor executor = new
VirtualThreadPoolTaskExecutor();
executor.setPoolSize(props.getBatchExecutor().getPoolSize());
executor.setAwaitTerminationSeconds(props.getBatchExecutor().getAwaitTerminationSeconds());
@@ -463,7 +464,7 @@ public class IdRepoRESTCXFContext {
final Bus bus,
final SyncopeLogic syncopeLogic,
@Qualifier("batchExecutor")
- final VirtualThreadPoolTaskExecutor batchExecutor,
+ final AsyncTaskExecutor batchExecutor,
final BatchDAO batchDAO,
final EntityFactory entityFactory) {
diff --git
a/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/service/SyncopeServiceImpl.java
b/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/service/SyncopeServiceImpl.java
index 1a3d7ff551..785c0a10fd 100644
---
a/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/service/SyncopeServiceImpl.java
+++
b/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/service/SyncopeServiceImpl.java
@@ -51,7 +51,7 @@ import org.apache.syncope.core.persistence.api.entity.Batch;
import org.apache.syncope.core.persistence.api.entity.EntityFactory;
import org.apache.syncope.core.rest.cxf.batch.BatchProcess;
import org.apache.syncope.core.spring.security.AuthContextUtils;
-import org.apache.syncope.core.spring.task.VirtualThreadPoolTaskExecutor;
+import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
@@ -61,7 +61,7 @@ public class SyncopeServiceImpl extends AbstractService
implements SyncopeServic
protected final SyncopeLogic logic;
- protected final VirtualThreadPoolTaskExecutor batchExecutor;
+ protected final AsyncTaskExecutor batchExecutor;
protected final Bus bus;
@@ -71,7 +71,7 @@ public class SyncopeServiceImpl extends AbstractService
implements SyncopeServic
public SyncopeServiceImpl(
final SyncopeLogic logic,
- final VirtualThreadPoolTaskExecutor batchExecutor,
+ final AsyncTaskExecutor batchExecutor,
final Bus bus,
final BatchDAO batchDAO,
final EntityFactory entityFactory) {
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java
index 47b0a67cd8..88a3d2fcad 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java
@@ -195,7 +195,7 @@ public class ProvisioningContext {
*/
@Bean
@Primary
- public VirtualThreadPoolTaskExecutor asyncConnectorFacadeExecutor(final
ProvisioningProperties props) {
+ public AsyncTaskExecutor asyncConnectorFacadeExecutor(final
ProvisioningProperties props) {
VirtualThreadPoolTaskExecutor executor = new
VirtualThreadPoolTaskExecutor();
executor.setPoolSize(props.getAsyncConnectorFacadeExecutor().getPoolSize());
executor.setAwaitTerminationSeconds(props.getAsyncConnectorFacadeExecutor().getAwaitTerminationSeconds());
@@ -208,7 +208,7 @@ public class ProvisioningContext {
@Bean
public AsyncConfigurer asyncConfigurer(
@Qualifier("asyncConnectorFacadeExecutor")
- final VirtualThreadPoolTaskExecutor asyncConnectorFacadeExecutor) {
+ final AsyncTaskExecutor asyncConnectorFacadeExecutor) {
return new AsyncConfigurer() {
@@ -226,7 +226,7 @@ public class ProvisioningContext {
* @return executor thread pool task executor
*/
@Bean
- public VirtualThreadPoolTaskExecutor
propagationTaskExecutorAsyncExecutor(final ProvisioningProperties props) {
+ public AsyncTaskExecutor propagationTaskExecutorAsyncExecutor(final
ProvisioningProperties props) {
VirtualThreadPoolTaskExecutor executor = new
VirtualThreadPoolTaskExecutor();
executor.setPoolSize(props.getPropagationTaskExecutorAsyncExecutor().getPoolSize());
executor.setWaitForTasksToCompleteOnShutdown(true);
@@ -459,7 +459,7 @@ public class ProvisioningContext {
@Bean
public PropagationTaskExecutor propagationTaskExecutor(
@Qualifier("propagationTaskExecutorAsyncExecutor")
- final VirtualThreadPoolTaskExecutor
propagationTaskExecutorAsyncExecutor,
+ final AsyncTaskExecutor propagationTaskExecutorAsyncExecutor,
final TaskUtilsFactory taskUtilsFactory,
final AnyUtilsFactory anyUtilsFactory,
final ConnectorManager connectorManager,
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java
index a2fd5aa0df..e0ece47ee7 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java
@@ -57,9 +57,9 @@ import org.apache.syncope.core.provisioning.api.macro.Command;
import org.apache.syncope.core.provisioning.api.macro.MacroActions;
import org.apache.syncope.core.provisioning.api.serialization.POJOHelper;
import org.apache.syncope.core.spring.implementation.ImplementationManager;
-import org.apache.syncope.core.spring.task.VirtualThreadPoolTaskExecutor;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.task.AsyncTaskExecutor;
import
org.springframework.security.concurrent.DelegatingSecurityContextCallable;
import org.springframework.util.ReflectionUtils;
@@ -74,7 +74,7 @@ public class MacroJobDelegate extends
AbstractSchedTaskJobDelegate<MacroTask> {
protected Validator validator;
@Resource(name = "batchExecutor")
- protected VirtualThreadPoolTaskExecutor taskExecutor;
+ protected AsyncTaskExecutor taskExecutor;
protected final Map<String, MacroActions> perContextActions = new
ConcurrentHashMap<>();
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
index f4bfc86950..94ea5cdbcf 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
@@ -49,8 +49,8 @@ import
org.apache.syncope.core.provisioning.api.propagation.PropagationTaskInfo;
import org.apache.syncope.core.provisioning.java.pushpull.OutboundMatcher;
import org.apache.syncope.core.provisioning.java.utils.ConnObjectUtils;
import org.apache.syncope.core.spring.security.AuthContextUtils;
-import org.apache.syncope.core.spring.task.VirtualThreadPoolTaskExecutor;
import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
@@ -63,7 +63,7 @@ import
org.springframework.security.core.context.SecurityContextHolder;
*/
public class PriorityPropagationTaskExecutor extends
AbstractPropagationTaskExecutor {
- protected final VirtualThreadPoolTaskExecutor taskExecutor;
+ protected final AsyncTaskExecutor taskExecutor;
public PriorityPropagationTaskExecutor(
final ConnectorManager connectorManager,
@@ -79,7 +79,7 @@ public class PriorityPropagationTaskExecutor extends
AbstractPropagationTaskExec
final OutboundMatcher outboundMatcher,
final PlainAttrValidationManager validator,
final ApplicationEventPublisher publisher,
- final VirtualThreadPoolTaskExecutor taskExecutor) {
+ final AsyncTaskExecutor taskExecutor) {
super(connectorManager,
connObjectUtils,
diff --git
a/core/spring/src/main/java/org/apache/syncope/core/spring/task/VirtualThreadPoolTaskExecutor.java
b/core/spring/src/main/java/org/apache/syncope/core/spring/task/VirtualThreadPoolTaskExecutor.java
index 6bcf57d0a8..fea02c8f2b 100644
---
a/core/spring/src/main/java/org/apache/syncope/core/spring/task/VirtualThreadPoolTaskExecutor.java
+++
b/core/spring/src/main/java/org/apache/syncope/core/spring/task/VirtualThreadPoolTaskExecutor.java
@@ -18,16 +18,18 @@
*/
package org.apache.syncope.core.spring.task;
+import java.util.List;
import java.util.Optional;
+import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskDecorator;
-import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.scheduling.concurrent.ExecutorConfigurationSupport;
@@ -39,6 +41,8 @@ public class VirtualThreadPoolTaskExecutor
private int poolSize = -1;
+ private long taskTerminationTimeout;
+
private TaskDecorator taskDecorator;
private SimpleAsyncTaskExecutor executor;
@@ -53,10 +57,24 @@ public class VirtualThreadPoolTaskExecutor
}
/**
- * @return the maximum number of managed threads
+ * Specify a timeout (in milliseconds) for task termination when closing
+ * this executor. The default is 0, not waiting for task termination at
all.
+ * <p>
+ * Note that a concrete >0 timeout specified here will lead to the
+ * wrapping of every submitted task into a task-tracking runnable which
+ * involves considerable overhead in case of a high number of tasks.
+ * However, for a modest level of submissions with longer-running
+ * tasks, this is feasible in order to arrive at a graceful shutdown.
+ * <p>
+ * Note that {@code SimpleAsyncTaskExecutor} does not participate in
+ * a coordinated lifecycle stop but rather just awaits task termination
+ * on {@link #close()}.
+ *
+ * @param taskTerminationTimeout the timeout in milliseconds
+ * @see SimpleAsyncTaskExecutor#close
*/
- public int getPoolSize() {
- return poolSize;
+ public void setTaskTerminationTimeout(final long taskTerminationTimeout) {
+ this.taskTerminationTimeout = taskTerminationTimeout;
}
/**
@@ -94,10 +112,46 @@ public class VirtualThreadPoolTaskExecutor
executor = new SimpleAsyncTaskExecutor(getThreadNamePrefix());
executor.setVirtualThreads(true);
+ executor.setDaemon(true);
executor.setConcurrencyLimit(poolSize);
+ if (taskTerminationTimeout >= 0) {
+ executor.setTaskTerminationTimeout(taskTerminationTimeout);
+ }
Optional.ofNullable(taskDecorator).ifPresent(executor::setTaskDecorator);
- return new ExecutorServiceAdapter(executor);
+ return new AbstractExecutorService() {
+
+ @Override
+ public void execute(final Runnable task) {
+ executor.execute(task);
+ }
+
+ @Override
+ public void shutdown() {
+ executor.close();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ executor.close();
+ return List.of();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return !executor.isActive();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return !executor.isActive();
+ }
+
+ @Override
+ public boolean awaitTermination(final long timeout, final TimeUnit
unit) throws InterruptedException {
+ return !executor.isActive();
+ }
+ };
}
@Override
@@ -117,6 +171,6 @@ public class VirtualThreadPoolTaskExecutor
@Override
public void shutdown() {
- // manual shutdown is not supported
+ executor.close();
}
}