This is an automated email from the ASF dual-hosted git repository.
ilgrosso pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/syncope.git
The following commit(s) were added to refs/heads/master by this push:
new 420eea0fd2 [SYNCOPE-1953] Job stop improved
420eea0fd2 is described below
commit 420eea0fd22b749614a5037ad0ca1e05dbdd4b95
Author: Francesco Chicchiriccò <[email protected]>
AuthorDate: Thu Feb 26 15:28:48 2026 +0100
[SYNCOPE-1953] Job stop improved
---
.../core/persistence/api/dao/JobStatusDAO.java | 5 +
.../core/persistence/jpa/dao/JPAJobStatusDAO.java | 12 ++
.../persistence/neo4j/dao/Neo4jJobStatusDAO.java | 7 +
...eSchedTaskJobDelegate.java => JobDelegate.java} | 7 +-
.../provisioning/api/job/SchedTaskJobDelegate.java | 2 +-
...kJobDelegate.java => StoppableJobDelegate.java} | 5 +-
.../api/job/report/ReportJobDelegate.java | 3 +-
.../api/notification/NotificationJobDelegate.java | 3 +-
.../provisioning/java/ProvisioningContext.java | 8 --
.../provisioning/java/job/AfterHandlingJob.java | 6 +
.../provisioning/java/job/DefaultJobManager.java | 2 -
.../job/GroupMemberProvisionTaskJobDelegate.java | 150 +++++++++++++++------
.../syncope/core/provisioning/java/job/Job.java | 3 +
.../provisioning/java/job/MacroJobDelegate.java | 15 ++-
.../java/job/SyncopeTaskScheduler.java | 6 +-
.../java/job/SystemLoadReporterJob.java | 10 +-
.../java/job/notification/NotificationJob.java | 5 +
.../java/job/report/AbstractReportJobDelegate.java | 10 +-
.../provisioning/java/job/report/ReportJob.java | 43 ++++--
.../java/pushpull/LiveSyncJobDelegate.java | 4 +-
.../java/pushpull/PullJobDelegate.java | 4 +-
.../java/pushpull/PushJobDelegate.java | 4 +-
.../java/job/SyncopeTaskSchedulerTest.java | 6 +
.../core/starter/SyncopeCoreApplication.java | 5 +-
.../syncope/core/starter/actuate/JobEndpoint.java | 41 +++++-
25 files changed, 272 insertions(+), 94 deletions(-)
diff --git
a/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/JobStatusDAO.java
b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/JobStatusDAO.java
index 0fa09e7aa8..7fc49ebde7 100644
---
a/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/JobStatusDAO.java
+++
b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/JobStatusDAO.java
@@ -18,6 +18,9 @@
*/
package org.apache.syncope.core.persistence.api.dao;
+import java.util.List;
+import org.apache.syncope.core.persistence.api.entity.JobStatus;
+
public interface JobStatusDAO {
String JOB_FIRED_STATUS = "JOB_FIRED";
@@ -31,4 +34,6 @@ public interface JobStatusDAO {
void set(String key, String status);
String get(String key);
+
+ List<? extends JobStatus> findAll();
}
diff --git
a/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPAJobStatusDAO.java
b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPAJobStatusDAO.java
index 520808b525..b43ec9fb15 100644
---
a/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPAJobStatusDAO.java
+++
b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPAJobStatusDAO.java
@@ -22,6 +22,7 @@ import jakarta.persistence.EntityManager;
import jakarta.persistence.Query;
import java.util.List;
import org.apache.syncope.core.persistence.api.dao.JobStatusDAO;
+import org.apache.syncope.core.persistence.api.entity.JobStatus;
import org.apache.syncope.core.persistence.jpa.entity.JPAJobStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,4 +87,15 @@ public class JPAJobStatusDAO implements JobStatusDAO {
List<Object> result = query.getResultList();
return result.isEmpty() ? UNKNOWN_STATUS :
result.getFirst().toString();
}
+
+ @Transactional(readOnly = true)
+ @Override
+ public List<? extends JobStatus> findAll() {
+ Query query = entityManager.createNativeQuery(
+ "SELECT id,jobStatus FROM " + JPAJobStatus.TABLE + " ORDER BY
id", JPAJobStatus.class);
+
+ @SuppressWarnings("unchecked")
+ List<JPAJobStatus> result = query.getResultList();
+ return result;
+ }
}
diff --git
a/core/persistence-neo4j/src/main/java/org/apache/syncope/core/persistence/neo4j/dao/Neo4jJobStatusDAO.java
b/core/persistence-neo4j/src/main/java/org/apache/syncope/core/persistence/neo4j/dao/Neo4jJobStatusDAO.java
index f283de6529..c7ca9549a1 100644
---
a/core/persistence-neo4j/src/main/java/org/apache/syncope/core/persistence/neo4j/dao/Neo4jJobStatusDAO.java
+++
b/core/persistence-neo4j/src/main/java/org/apache/syncope/core/persistence/neo4j/dao/Neo4jJobStatusDAO.java
@@ -18,6 +18,7 @@
*/
package org.apache.syncope.core.persistence.neo4j.dao;
+import java.util.List;
import org.apache.syncope.core.persistence.api.dao.JobStatusDAO;
import org.apache.syncope.core.persistence.api.entity.JobStatus;
import org.apache.syncope.core.persistence.neo4j.entity.Neo4jJobStatus;
@@ -81,4 +82,10 @@ public class Neo4jJobStatusDAO implements JobStatusDAO {
public String get(final String key) {
return neo4jTemplate.findById(key,
Neo4jJobStatus.class).map(JobStatus::getStatus).orElse(UNKNOWN_STATUS);
}
+
+ @Transactional(readOnly = true)
+ @Override
+ public List<? extends JobStatus> findAll() {
+ return
neo4jTemplate.findAll(Neo4jJobStatus.class).stream().map(JobStatus.class::cast).toList();
+ }
}
diff --git
a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/StoppableSchedTaskJobDelegate.java
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java
similarity index 82%
copy from
core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/StoppableSchedTaskJobDelegate.java
copy to
core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java
index 9bb999ae96..815607a695 100644
---
a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/StoppableSchedTaskJobDelegate.java
+++
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java
@@ -18,10 +18,5 @@
*/
package org.apache.syncope.core.provisioning.api.job;
-public interface StoppableSchedTaskJobDelegate extends SchedTaskJobDelegate {
-
- /**
- * Request the current Job to stop the execution of the running Task.
- */
- void stop();
+public interface JobDelegate {
}
diff --git
a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/SchedTaskJobDelegate.java
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/SchedTaskJobDelegate.java
index 432b6ec5e6..49b303d4e0 100644
---
a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/SchedTaskJobDelegate.java
+++
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/SchedTaskJobDelegate.java
@@ -21,7 +21,7 @@ package org.apache.syncope.core.provisioning.api.job;
import org.apache.syncope.common.lib.types.TaskType;
@FunctionalInterface
-public interface SchedTaskJobDelegate {
+public interface SchedTaskJobDelegate extends JobDelegate {
/**
* Executes a Job to run the given Task.
diff --git
a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/StoppableSchedTaskJobDelegate.java
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/StoppableJobDelegate.java
similarity index 85%
rename from
core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/StoppableSchedTaskJobDelegate.java
rename to
core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/StoppableJobDelegate.java
index 9bb999ae96..102b238e72 100644
---
a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/StoppableSchedTaskJobDelegate.java
+++
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/StoppableJobDelegate.java
@@ -18,10 +18,11 @@
*/
package org.apache.syncope.core.provisioning.api.job;
-public interface StoppableSchedTaskJobDelegate extends SchedTaskJobDelegate {
+@FunctionalInterface
+public interface StoppableJobDelegate {
/**
- * Request the current Job to stop the execution of the running Task.
+ * Request the current Job to stop the execution.
*/
void stop();
}
diff --git
a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/report/ReportJobDelegate.java
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/report/ReportJobDelegate.java
index a8e686f586..d0410cef84 100644
---
a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/report/ReportJobDelegate.java
+++
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/report/ReportJobDelegate.java
@@ -19,11 +19,12 @@
package org.apache.syncope.core.provisioning.api.job.report;
import org.apache.syncope.common.lib.report.ReportConf;
+import org.apache.syncope.core.provisioning.api.job.JobDelegate;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
import org.apache.syncope.core.provisioning.api.job.JobExecutionException;
@FunctionalInterface
-public interface ReportJobDelegate {
+public interface ReportJobDelegate extends JobDelegate {
/**
* Optional configuration.
diff --git
a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/notification/NotificationJobDelegate.java
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/notification/NotificationJobDelegate.java
index adf95bfd6b..b4a8438000 100644
---
a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/notification/NotificationJobDelegate.java
+++
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/notification/NotificationJobDelegate.java
@@ -20,8 +20,9 @@ package org.apache.syncope.core.provisioning.api.notification;
import org.apache.syncope.core.persistence.api.entity.task.NotificationTask;
import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
+import org.apache.syncope.core.provisioning.api.job.JobDelegate;
-public interface NotificationJobDelegate {
+public interface NotificationJobDelegate extends JobDelegate {
TaskExec<NotificationTask> executeSingle(NotificationTask task, String
executor);
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java
index 1458cfb449..f2e8af9217 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java
@@ -153,7 +153,6 @@ import
org.apache.syncope.core.provisioning.java.data.wa.WAClientAppDataBinderIm
import org.apache.syncope.core.provisioning.java.job.DefaultJobManager;
import org.apache.syncope.core.provisioning.java.job.JobStatusUpdater;
import org.apache.syncope.core.provisioning.java.job.SyncopeTaskScheduler;
-import org.apache.syncope.core.provisioning.java.job.SystemLoadReporterJob;
import
org.apache.syncope.core.provisioning.java.job.notification.MailNotificationJobDelegate;
import
org.apache.syncope.core.provisioning.java.job.notification.NotificationJob;
import
org.apache.syncope.core.provisioning.java.notification.DefaultNotificationManager;
@@ -174,7 +173,6 @@ import
org.apache.syncope.core.workflow.api.UserWorkflowAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import
org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
@@ -646,12 +644,6 @@ public class ProvisioningContext {
return new DefaultAuditManager(auditConfDAO, auditEventDAO,
entityFactory, auditEventProcessors, taskExecutor);
}
- @ConditionalOnMissingBean
- @Bean
- public SystemLoadReporterJob systemLoadReporterJob(final
ApplicationContext ctx) {
- return new SystemLoadReporterJob(ctx);
- }
-
@ConditionalOnMissingBean
@Bean
public NotificationJobDelegate notificationJobDelegate(
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AfterHandlingJob.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AfterHandlingJob.java
index febc8a91c3..e9705396ef 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AfterHandlingJob.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AfterHandlingJob.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import org.apache.syncope.core.persistence.api.ApplicationContextProvider;
import org.apache.syncope.core.provisioning.api.AuditManager;
import org.apache.syncope.core.provisioning.api.event.AfterHandlingEvent;
+import org.apache.syncope.core.provisioning.api.job.JobDelegate;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
import org.apache.syncope.core.provisioning.api.job.JobExecutionException;
import
org.apache.syncope.core.provisioning.api.notification.NotificationManager;
@@ -66,6 +67,11 @@ public class AfterHandlingJob extends Job {
@Autowired
private AuditManager auditManager;
+ @Override
+ protected JobDelegate getDelegate() {
+ return null;
+ }
+
@Override
protected void execute(final JobExecutionContext context) throws
JobExecutionException {
Optional<AfterHandlingEvent> event = Optional.ofNullable(
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/DefaultJobManager.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/DefaultJobManager.java
index c81d01fb0b..7e795f85b1 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/DefaultJobManager.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/DefaultJobManager.java
@@ -55,7 +55,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.support.CronTrigger;
-import org.springframework.transaction.annotation.Transactional;
public class DefaultJobManager implements JobManager, SyncopeCoreLoader {
@@ -297,7 +296,6 @@ public class DefaultJobManager implements JobManager,
SyncopeCoreLoader {
return 500;
}
- @Transactional
@Override
public void load(final String domain) {
AuthContextUtils.runAsAdmin(domain, () -> {
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/GroupMemberProvisionTaskJobDelegate.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/GroupMemberProvisionTaskJobDelegate.java
index 173f9fe7f0..9d16216347 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/GroupMemberProvisionTaskJobDelegate.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/GroupMemberProvisionTaskJobDelegate.java
@@ -21,12 +21,15 @@ package org.apache.syncope.core.provisioning.java.job;
import java.util.Collection;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
+import org.apache.syncope.common.lib.SyncopeConstants;
import org.apache.syncope.common.lib.to.PropagationStatus;
import org.apache.syncope.common.lib.types.AnyTypeKind;
import org.apache.syncope.common.lib.types.ProvisionAction;
import org.apache.syncope.common.lib.types.TaskType;
+import org.apache.syncope.core.persistence.api.dao.AnyDAO;
import org.apache.syncope.core.persistence.api.dao.AnySearchDAO;
import org.apache.syncope.core.persistence.api.dao.GroupDAO;
+import org.apache.syncope.core.persistence.api.dao.RealmDAO;
import org.apache.syncope.core.persistence.api.dao.search.MembershipCond;
import org.apache.syncope.core.persistence.api.dao.search.SearchCond;
import org.apache.syncope.core.persistence.api.entity.anyobject.AnyObject;
@@ -38,20 +41,28 @@ import
org.apache.syncope.core.provisioning.api.AnyObjectProvisioningManager;
import org.apache.syncope.core.provisioning.api.UserProvisioningManager;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
import org.apache.syncope.core.provisioning.api.job.JobExecutionException;
+import org.apache.syncope.core.provisioning.api.job.StoppableJobDelegate;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Sort;
import org.springframework.transaction.annotation.Transactional;
-public class GroupMemberProvisionTaskJobDelegate extends
AbstractSchedTaskJobDelegate<SchedTask> {
+public class GroupMemberProvisionTaskJobDelegate
+ extends AbstractSchedTaskJobDelegate<SchedTask>
+ implements StoppableJobDelegate {
public static final String ACTION_JOBDETAIL_KEY = "action";
public static final String GROUP_KEY_JOBDETAIL_KEY = "groupKey";
+ @Autowired
+ private RealmDAO realmDAO;
+
@Autowired
private GroupDAO groupDAO;
@Autowired
- private AnySearchDAO searchDAO;
+ private AnySearchDAO anySearchDAO;
@Autowired
private UserProvisioningManager userProvisioningManager;
@@ -63,6 +74,13 @@ public class GroupMemberProvisionTaskJobDelegate extends
AbstractSchedTaskJobDel
private ProvisionAction action;
+ private volatile boolean stopRequested = false;
+
+ @Override
+ public void stop() {
+ stopRequested = true;
+ }
+
@Transactional
@Override
public void execute(
@@ -89,57 +107,111 @@ public class GroupMemberProvisionTaskJobDelegate extends
AbstractSchedTaskJobDel
setStatus(result.toString());
+ Collection<String> gResources = groupDAO.findAllResourceKeys(groupKey);
+
MembershipCond membershipCond = new MembershipCond();
membershipCond.setGroup(groupKey);
- List<User> users = searchDAO.search(SearchCond.of(membershipCond),
AnyTypeKind.USER);
- Collection<String> gResources = groupDAO.findAllResourceKeys(groupKey);
+ SearchCond cond = SearchCond.of(membershipCond);
+
+ long userCount = anySearchDAO.count(
+ realmDAO.getRoot(),
+ true,
+ SyncopeConstants.FULL_ADMIN_REALMS,
+ cond,
+ AnyTypeKind.USER);
+
setStatus("About to "
+ (action == ProvisionAction.DEPROVISION ? "de" : "") +
"provision "
- + users.size() + " users from " + gResources);
-
- for (User user : users) {
- List<PropagationStatus> statuses = action ==
ProvisionAction.DEPROVISION
- ? userProvisioningManager.deprovision(
- user.getKey(), gResources, false, executor)
- : userProvisioningManager.provision(
- user.getKey(), true, null, gResources, false,
executor);
- for (PropagationStatus propagationStatus : statuses) {
- result.append("User ").append(user.getKey()).append('\t').
- append("Resource
").append(propagationStatus.getResource()).append('\t').
- append(propagationStatus.getStatus());
- if
(StringUtils.isNotBlank(propagationStatus.getFailureReason())) {
-
result.append('\n').append(propagationStatus.getFailureReason()).append('\n');
+ + userCount + " users " + (action ==
ProvisionAction.DEPROVISION ? "from " : "to ") + gResources);
+
+ long pages = (userCount / AnyDAO.DEFAULT_PAGE_SIZE) + 1;
+ Sort sort = Sort.by(Sort.Direction.ASC, "creationDate");
+
+ for (int page = 0; page < pages && !stopRequested; page++) {
+ setStatus("Processing " + userCount + " users: page " + page + "
of " + pages);
+
+ List<User> users = anySearchDAO.search(
+ realmDAO.getRoot(),
+ true,
+ SyncopeConstants.FULL_ADMIN_REALMS,
+ cond,
+ PageRequest.of(page, AnyDAO.DEFAULT_PAGE_SIZE, sort),
+ AnyTypeKind.USER);
+
+ for (int i = 0; i < users.size() && !stopRequested; i++) {
+ User user = users.get(i);
+
+ List<PropagationStatus> statuses = action ==
ProvisionAction.DEPROVISION
+ ? userProvisioningManager.deprovision(
+ user.getKey(), gResources, false, executor)
+ : userProvisioningManager.provision(
+ user.getKey(), true, null, gResources, false,
executor);
+ for (PropagationStatus propagationStatus : statuses) {
+ result.append("User ").append(user.getKey()).append('\t').
+ append("Resource
").append(propagationStatus.getResource()).append('\t').
+ append(propagationStatus.getStatus());
+ if
(StringUtils.isNotBlank(propagationStatus.getFailureReason())) {
+
result.append('\n').append(propagationStatus.getFailureReason()).append('\n');
+ }
+ result.append('\n');
}
result.append('\n');
}
- result.append('\n');
}
- membershipCond = new MembershipCond();
- membershipCond.setGroup(groupKey);
- List<AnyObject> anyObjects =
searchDAO.search(SearchCond.of(membershipCond), AnyTypeKind.ANY_OBJECT);
+ if (stopRequested) {
+ result.append("\nStop was requested");
+ return result.toString();
+ }
+
+ long anyObjectCount = anySearchDAO.count(
+ realmDAO.getRoot(),
+ true,
+ SyncopeConstants.FULL_ADMIN_REALMS,
+ SearchCond.of(membershipCond),
+ AnyTypeKind.ANY_OBJECT);
setStatus("About to "
+ (action == ProvisionAction.DEPROVISION ? "de" : "") +
"provision "
- + anyObjects.size() + " any objects from " + gResources);
-
- for (AnyObject anyObject : anyObjects) {
- List<PropagationStatus> statuses = action ==
ProvisionAction.DEPROVISION
- ? anyObjectProvisioningManager.deprovision(
- anyObject.getKey(), gResources, false, executor)
- : anyObjectProvisioningManager.provision(
- anyObject.getKey(), gResources, false, executor);
-
- for (PropagationStatus propagationStatus : statuses) {
- result.append(anyObject.getType().getKey()).append(' ').
- append(anyObject.getKey()).append('\t').
- append("Resource
").append(propagationStatus.getResource()).append('\t').
- append(propagationStatus.getStatus());
- if
(StringUtils.isNotBlank(propagationStatus.getFailureReason())) {
-
result.append('\n').append(propagationStatus.getFailureReason()).append('\n');
+ + anyObjectCount + " any objects from " + gResources);
+
+ pages = (anyObjectCount / AnyDAO.DEFAULT_PAGE_SIZE) + 1;
+
+ for (int page = 0; page < pages && !stopRequested; page++) {
+ setStatus("Processing " + anyObjectCount + " anyObjects: page " +
page + " of " + pages);
+
+ List<AnyObject> anyObjects = anySearchDAO.search(
+ realmDAO.getRoot(),
+ true,
+ SyncopeConstants.FULL_ADMIN_REALMS,
+ cond,
+ PageRequest.of(page, AnyDAO.DEFAULT_PAGE_SIZE, sort),
+ AnyTypeKind.ANY_OBJECT);
+
+ for (int i = 0; i < anyObjects.size() && !stopRequested; i++) {
+ AnyObject anyObject = anyObjects.get(i);
+
+ List<PropagationStatus> statuses = action ==
ProvisionAction.DEPROVISION
+ ? anyObjectProvisioningManager.deprovision(
+ anyObject.getKey(), gResources, false,
executor)
+ : anyObjectProvisioningManager.provision(
+ anyObject.getKey(), gResources, false,
executor);
+
+ for (PropagationStatus propagationStatus : statuses) {
+ result.append(anyObject.getType().getKey()).append(' ').
+ append(anyObject.getKey()).append('\t').
+ append("Resource
").append(propagationStatus.getResource()).append('\t').
+ append(propagationStatus.getStatus());
+ if
(StringUtils.isNotBlank(propagationStatus.getFailureReason())) {
+
result.append('\n').append(propagationStatus.getFailureReason()).append('\n');
+ }
+ result.append('\n');
}
result.append('\n');
}
- result.append('\n');
+ }
+
+ if (stopRequested) {
+ result.append("\nStop was requested");
}
return result.toString();
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/Job.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/Job.java
index a30be09722..8304df1dde 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/Job.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/Job.java
@@ -20,6 +20,7 @@ package org.apache.syncope.core.provisioning.java.job;
import java.util.Optional;
import org.apache.syncope.core.persistence.api.dao.JobStatusDAO;
+import org.apache.syncope.core.provisioning.api.job.JobDelegate;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
import org.apache.syncope.core.provisioning.api.job.JobExecutionException;
import org.apache.syncope.core.spring.security.AuthContextUtils;
@@ -47,6 +48,8 @@ public abstract class Job implements Runnable {
this.context = context;
}
+ protected abstract JobDelegate getDelegate();
+
protected abstract void execute(JobExecutionContext context) throws
JobExecutionException;
@Override
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java
index fb7201f0d3..98b6f636f3 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java
@@ -55,6 +55,7 @@ import
org.apache.syncope.core.persistence.api.utils.FormatUtils;
import org.apache.syncope.core.provisioning.api.jexl.JexlTools;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
import org.apache.syncope.core.provisioning.api.job.JobExecutionException;
+import org.apache.syncope.core.provisioning.api.job.StoppableJobDelegate;
import org.apache.syncope.core.provisioning.api.macro.Command;
import org.apache.syncope.core.provisioning.api.macro.MacroActions;
import org.apache.syncope.core.provisioning.api.serialization.POJOHelper;
@@ -65,7 +66,7 @@ import org.springframework.core.task.AsyncTaskExecutor;
import
org.springframework.security.concurrent.DelegatingSecurityContextCallable;
import org.springframework.util.ReflectionUtils;
-public class MacroJobDelegate extends AbstractSchedTaskJobDelegate<MacroTask> {
+public class MacroJobDelegate extends AbstractSchedTaskJobDelegate<MacroTask>
implements StoppableJobDelegate {
public static final String MACRO_TASK_FORM_JOBDETAIL_KEY = "macroTaskForm";
@@ -81,6 +82,8 @@ public class MacroJobDelegate extends
AbstractSchedTaskJobDelegate<MacroTask> {
@Autowired
protected JexlTools jexlTools;
+ protected volatile boolean stopRequested = false;
+
protected final Map<String, MacroActions> perContextActions = new
ConcurrentHashMap<>();
protected final Map<String, Command<?>> perContextCommands = new
ConcurrentHashMap<>();
@@ -203,7 +206,7 @@ public class MacroJobDelegate extends
AbstractSchedTaskJobDelegate<MacroTask> {
Mutable<Pair<String, Throwable>> error = new
MutableObject<>();
- for (int i = 0; i < commands.size() && error.get() ==
null; i++) {
+ for (int i = 0; i < commands.size() && !stopRequested &&
error.get() == null; i++) {
Pair<Command<CommandArgs>, CommandArgs> command =
commands.get(i);
try {
@@ -251,9 +254,17 @@ public class MacroJobDelegate extends
AbstractSchedTaskJobDelegate<MacroTask> {
throw new JobExecutionException("While waiting for macro commands
completion", e);
}
+ if (stopRequested) {
+ output.append("\nStop was requested");
+ }
output.append("COMPLETED");
}
+ @Override
+ public void stop() {
+ stopRequested = true;
+ }
+
@SuppressWarnings("unchecked")
@Override
protected String doExecute(final JobExecutionContext context) throws
JobExecutionException {
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/SyncopeTaskScheduler.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/SyncopeTaskScheduler.java
index 413f2335dd..51382daabd 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/SyncopeTaskScheduler.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/SyncopeTaskScheduler.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.syncope.core.persistence.api.dao.JobStatusDAO;
-import
org.apache.syncope.core.provisioning.api.job.StoppableSchedTaskJobDelegate;
+import org.apache.syncope.core.provisioning.api.job.StoppableJobDelegate;
import org.apache.syncope.core.spring.security.AuthContextUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,9 +99,7 @@ public class SyncopeTaskScheduler {
protected void stop(final Key key, final List<Function<Value,
Optional<ScheduledFuture<?>>>> suppliers) {
Optional.ofNullable(jobs.get(key)).ifPresent(value -> {
boolean mayInterruptIfRunning;
- if (value.job() instanceof TaskJob taskJob
- && taskJob.getDelegate() instanceof
StoppableSchedTaskJobDelegate stoppable) {
-
+ if (value.job().getDelegate() instanceof StoppableJobDelegate
stoppable) {
stoppable.stop();
mayInterruptIfRunning = false;
} else {
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/SystemLoadReporterJob.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/SystemLoadReporterJob.java
index 82b27baa84..5bd81ae401 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/SystemLoadReporterJob.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/SystemLoadReporterJob.java
@@ -20,7 +20,9 @@ package org.apache.syncope.core.provisioning.java.job;
import java.lang.management.ManagementFactory;
import org.apache.syncope.common.lib.info.SystemInfo;
+import org.apache.syncope.core.provisioning.api.job.JobDelegate;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
/**
@@ -30,10 +32,12 @@ public class SystemLoadReporterJob extends Job {
protected static final Integer MB = 1024 * 1024;
- protected final ApplicationEventPublisher publisher;
+ @Autowired
+ protected ApplicationEventPublisher publisher;
- public SystemLoadReporterJob(final ApplicationEventPublisher publisher) {
- this.publisher = publisher;
+ @Override
+ protected JobDelegate getDelegate() {
+ return null;
}
@Override
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/NotificationJob.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/NotificationJob.java
index f71a09f9c5..54210ce8f8 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/NotificationJob.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/NotificationJob.java
@@ -63,6 +63,11 @@ public class NotificationJob extends Job {
this.delegate = delegate;
}
+ @Override
+ public NotificationJobDelegate getDelegate() {
+ return delegate;
+ }
+
@Override
protected void execute(final JobExecutionContext context) throws
JobExecutionException {
LOG.debug("Waking up...");
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AbstractReportJobDelegate.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AbstractReportJobDelegate.java
index 3ae8f49882..a378def165 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AbstractReportJobDelegate.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AbstractReportJobDelegate.java
@@ -38,6 +38,7 @@ import
org.apache.syncope.core.provisioning.api.event.JobStatusEvent;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
import org.apache.syncope.core.provisioning.api.job.JobExecutionException;
import org.apache.syncope.core.provisioning.api.job.JobNamer;
+import org.apache.syncope.core.provisioning.api.job.StoppableJobDelegate;
import org.apache.syncope.core.provisioning.api.job.report.ReportJobDelegate;
import
org.apache.syncope.core.provisioning.api.notification.NotificationManager;
import org.apache.syncope.core.spring.security.AuthContextUtils;
@@ -48,7 +49,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.transaction.annotation.Transactional;
-public abstract class AbstractReportJobDelegate implements ReportJobDelegate {
+public abstract class AbstractReportJobDelegate implements ReportJobDelegate,
StoppableJobDelegate {
protected static final Logger LOG =
LoggerFactory.getLogger(ReportJobDelegate.class);
@@ -86,6 +87,8 @@ public abstract class AbstractReportJobDelegate implements
ReportJobDelegate {
@Autowired
protected ApplicationEventPublisher publisher;
+ protected volatile boolean stopRequested = false;
+
@Override
public void setConf(final ReportConf conf) {
this.conf = conf;
@@ -96,6 +99,11 @@ public abstract class AbstractReportJobDelegate implements
ReportJobDelegate {
this, AuthContextUtils.getDomain(),
JobNamer.getJobName(report), status));
}
+ @Override
+ public void stop() {
+ stopRequested = true;
+ }
+
@Transactional
@Override
public void execute(
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/ReportJob.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/ReportJob.java
index 92bf027012..31c9de5041 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/ReportJob.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/ReportJob.java
@@ -59,6 +59,31 @@ public class ReportJob extends Job {
@Autowired
private DomainHolder<?> domainHolder;
+ private ReportJobDelegate delegate;
+
+ @Override
+ public ReportJobDelegate getDelegate() {
+ return delegate;
+ }
+
+ protected void delegate(final JobExecutionContext context, final String
reportKey)
+ throws ClassNotFoundException, JobExecutionException {
+
+ String implKey = (String)
context.getData().get(JobManager.DELEGATE_IMPLEMENTATION);
+ Implementation impl = implementationDAO.findById(implKey).orElse(null);
+ if (impl == null) {
+ LOG.error("Could not find Implementation '{}', aborting", implKey);
+ } else {
+ delegate = ImplementationManager.buildReportJobDelegate(
+ impl,
+ () -> perContextReportJobDelegates.get(impl.getKey()),
+ instance ->
perContextReportJobDelegates.put(impl.getKey(), instance)).
+ orElseThrow(() -> new IllegalArgumentException(
+ "Could not instantiate " + impl.getBody()));
+ delegate.execute(reportKey, context);
+ }
+ }
+
@Override
protected void execute(final JobExecutionContext context) throws
JobExecutionException {
if (!domainHolder.getDomains().containsKey(context.getDomain())) {
@@ -70,21 +95,11 @@ public class ReportJob extends Job {
try {
AuthContextUtils.runAsAdmin(context.getDomain(), () -> {
try {
- String implKey = (String)
context.getData().get(JobManager.DELEGATE_IMPLEMENTATION);
- Implementation impl =
implementationDAO.findById(implKey).orElse(null);
- if (impl == null) {
- LOG.error("Could not find Implementation '{}',
aborting", implKey);
- } else {
- ReportJobDelegate delegate =
ImplementationManager.buildReportJobDelegate(
- impl,
- () ->
perContextReportJobDelegates.get(impl.getKey()),
- instance ->
perContextReportJobDelegates.put(impl.getKey(), instance)).
- orElseThrow(() -> new IllegalArgumentException(
- "Could not instantiate " + impl.getBody()));
- delegate.execute(reportKey, context);
- }
+ delegate(context, reportKey);
} catch (Exception e) {
- LOG.error("While executing report {}", reportKey, e);
+ if (e instanceof RuntimeException re) {
+ throw re;
+ }
throw new RuntimeException(e);
}
});
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/LiveSyncJobDelegate.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/LiveSyncJobDelegate.java
index 003f7ed77e..4726995046 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/LiveSyncJobDelegate.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/LiveSyncJobDelegate.java
@@ -48,7 +48,7 @@ import
org.apache.syncope.core.persistence.api.entity.task.TaskExec;
import org.apache.syncope.core.persistence.api.utils.ExceptionUtils2;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
import org.apache.syncope.core.provisioning.api.job.JobExecutionException;
-import
org.apache.syncope.core.provisioning.api.job.StoppableSchedTaskJobDelegate;
+import org.apache.syncope.core.provisioning.api.job.StoppableJobDelegate;
import org.apache.syncope.core.provisioning.api.pushpull.InboundActions;
import org.apache.syncope.core.provisioning.api.pushpull.LiveSyncDeltaMapper;
import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
@@ -68,7 +68,7 @@ import
org.springframework.transaction.annotation.Transactional;
public class LiveSyncJobDelegate
extends AbstractPullExecutor<LiveSyncTask>
- implements SyncopePullExecutor, StoppableSchedTaskJobDelegate {
+ implements SyncopePullExecutor, StoppableJobDelegate {
protected record LiveSyncInfo(
Provision provision,
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PullJobDelegate.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PullJobDelegate.java
index caa1547494..32ed2fb6bd 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PullJobDelegate.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PullJobDelegate.java
@@ -33,7 +33,7 @@ import
org.apache.syncope.core.persistence.api.entity.policy.InboundPolicy;
import org.apache.syncope.core.persistence.api.entity.task.PullTask;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
import org.apache.syncope.core.provisioning.api.job.JobExecutionException;
-import
org.apache.syncope.core.provisioning.api.job.StoppableSchedTaskJobDelegate;
+import org.apache.syncope.core.provisioning.api.job.StoppableJobDelegate;
import org.apache.syncope.core.provisioning.api.pushpull.AnyPullResultHandler;
import org.apache.syncope.core.provisioning.api.pushpull.InboundActions;
import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
@@ -49,7 +49,7 @@ import
org.identityconnectors.framework.common.objects.OperationOptions;
public class PullJobDelegate
extends AbstractPullExecutor<PullTask>
- implements SyncopePullExecutor, StoppableSchedTaskJobDelegate {
+ implements SyncopePullExecutor, StoppableJobDelegate {
protected Optional<ReconFilterBuilder> perContextReconFilterBuilder =
Optional.empty();
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PushJobDelegate.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PushJobDelegate.java
index fd2613e37a..aa092cdccf 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PushJobDelegate.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PushJobDelegate.java
@@ -47,7 +47,7 @@ import
org.apache.syncope.core.persistence.api.search.SearchCondConverter;
import org.apache.syncope.core.provisioning.api.ProvisionSorter;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
import org.apache.syncope.core.provisioning.api.job.JobExecutionException;
-import
org.apache.syncope.core.provisioning.api.job.StoppableSchedTaskJobDelegate;
+import org.apache.syncope.core.provisioning.api.job.StoppableJobDelegate;
import org.apache.syncope.core.provisioning.api.pushpull.AnyPushResultHandler;
import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
import org.apache.syncope.core.provisioning.api.pushpull.PushActions;
@@ -62,7 +62,7 @@ import org.springframework.data.domain.PageRequest;
public class PushJobDelegate
extends AbstractProvisioningJobDelegate<PushTask>
- implements SyncopePushExecutor, StoppableSchedTaskJobDelegate {
+ implements SyncopePushExecutor, StoppableJobDelegate {
@Autowired
protected AnySearchDAO searchDAO;
diff --git
a/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/job/SyncopeTaskSchedulerTest.java
b/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/job/SyncopeTaskSchedulerTest.java
index 3151877646..c637754d7c 100644
---
a/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/job/SyncopeTaskSchedulerTest.java
+++
b/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/job/SyncopeTaskSchedulerTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.syncope.core.persistence.api.dao.JobStatusDAO;
+import org.apache.syncope.core.provisioning.api.job.JobDelegate;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
import org.apache.syncope.core.provisioning.api.job.JobExecutionException;
import org.apache.syncope.core.provisioning.java.AbstractTest;
@@ -47,6 +48,11 @@ public class SyncopeTaskSchedulerTest extends AbstractTest {
private static class TestJob extends Job {
+ @Override
+ protected JobDelegate getDelegate() {
+ return null;
+ }
+
@Override
protected void execute(final JobExecutionContext context) throws
JobExecutionException {
VALUE.setValue(1);
diff --git
a/core/starter/src/main/java/org/apache/syncope/core/starter/SyncopeCoreApplication.java
b/core/starter/src/main/java/org/apache/syncope/core/starter/SyncopeCoreApplication.java
index b204baa3ae..02cf8d8f1f 100644
---
a/core/starter/src/main/java/org/apache/syncope/core/starter/SyncopeCoreApplication.java
+++
b/core/starter/src/main/java/org/apache/syncope/core/starter/SyncopeCoreApplication.java
@@ -31,6 +31,7 @@ import org.apache.syncope.core.persistence.api.dao.AnyTypeDAO;
import org.apache.syncope.core.persistence.api.dao.EntityCacheDAO;
import org.apache.syncope.core.persistence.api.dao.ExternalResourceDAO;
import org.apache.syncope.core.persistence.api.dao.GroupDAO;
+import org.apache.syncope.core.persistence.api.dao.JobStatusDAO;
import org.apache.syncope.core.persistence.api.dao.NotificationDAO;
import org.apache.syncope.core.persistence.api.dao.PersistenceInfoDAO;
import org.apache.syncope.core.persistence.api.dao.PolicyDAO;
@@ -185,8 +186,8 @@ public class SyncopeCoreApplication extends
SpringBootServletInitializer {
@ConditionalOnMissingBean
@Bean
- public JobEndpoint jobEndpoint(final SyncopeTaskScheduler
syncopeTaskScheduler) {
- return new JobEndpoint(syncopeTaskScheduler);
+ public JobEndpoint jobEndpoint(final SyncopeTaskScheduler
syncopeTaskScheduler, final JobStatusDAO jobStatusDAO) {
+ return new JobEndpoint(syncopeTaskScheduler, jobStatusDAO);
}
@Bean
diff --git
a/core/starter/src/main/java/org/apache/syncope/core/starter/actuate/JobEndpoint.java
b/core/starter/src/main/java/org/apache/syncope/core/starter/actuate/JobEndpoint.java
index ba9508a104..9f1f44ec74 100644
---
a/core/starter/src/main/java/org/apache/syncope/core/starter/actuate/JobEndpoint.java
+++
b/core/starter/src/main/java/org/apache/syncope/core/starter/actuate/JobEndpoint.java
@@ -19,10 +19,17 @@
package org.apache.syncope.core.starter.actuate;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.syncope.common.lib.types.JobAction;
+import org.apache.syncope.core.persistence.api.dao.JobStatusDAO;
+import org.apache.syncope.core.persistence.api.entity.JobStatus;
import org.apache.syncope.core.provisioning.java.job.SyncopeTaskScheduler;
+import org.apache.syncope.core.spring.security.AuthContextUtils;
+import org.springframework.boot.actuate.endpoint.annotation.DeleteOperation;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.boot.actuate.endpoint.annotation.Selector;
@@ -33,14 +40,18 @@ public class JobEndpoint {
protected final SyncopeTaskScheduler syncopeTaskScheduler;
- public JobEndpoint(final SyncopeTaskScheduler syncopeTaskScheduler) {
+ protected final JobStatusDAO jobStatusDAO;
+
+ public JobEndpoint(final SyncopeTaskScheduler syncopeTaskScheduler, final
JobStatusDAO jobStatusDAO) {
this.syncopeTaskScheduler = syncopeTaskScheduler;
+ this.jobStatusDAO = jobStatusDAO;
}
@ReadOperation
public Map<String, Object> status() {
- Map<String, Object> status = new HashMap<>();
+ Map<String, Object> status = new LinkedHashMap<>();
+ // first information about jobs defined in the Scheduler
syncopeTaskScheduler.getJobs().forEach((k, v) -> {
@SuppressWarnings("unchecked")
Map<String, Object> jobs = (Map<String, Object>)
status.computeIfAbsent(k.domain(), d -> new HashMap<>());
@@ -61,6 +72,27 @@ public class JobEndpoint {
});
});
+ // then check if there are jobs not reconciled, e.g. reported by
JobStatusDAO but not by Scheduler
+ // (potentially running in another node of the cluster)
+ Map<String, Object> unreconciled = new HashMap<>();
+
+ status.keySet().forEach(domain -> {
+ Set<JobStatus> jobStatuses = new HashSet<>(
+ AuthContextUtils.callAsAdmin(domain, () ->
jobStatusDAO.findAll()));
+
jobStatuses.removeIf(syncopeTaskScheduler.getJobNames(domain)::contains);
+
+ if (!jobStatuses.isEmpty()) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> jobs =
+ (Map<String, Object>)
unreconciled.computeIfAbsent(domain, d -> new HashMap<>());
+
+ jobStatuses.forEach(notfound -> jobs.put(notfound.getKey(),
notfound.getStatus()));
+ }
+ });
+ if (!unreconciled.isEmpty()) {
+ status.put("unreconciled", unreconciled);
+ }
+
return status;
}
@@ -84,4 +116,9 @@ public class JobEndpoint {
}
}
}
+
+ @DeleteOperation
+ public void forceUnlock(final @Selector String domain, final @Selector
String jobName) {
+ AuthContextUtils.runAsAdmin(domain, () ->
jobStatusDAO.unlock(jobName));
+ }
}