This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch UNOMI-878-new-scheduler
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/UNOMI-878-new-scheduler by
this push:
new 957f0259e UNOMI-878: Enhanced Cluster-Aware Task Scheduling Service
with Improved Developer Experience and Persistence Integration
957f0259e is described below
commit 957f0259e3c65668a94e34bea2592c49da660452
Author: Serge Huber <[email protected]>
AuthorDate: Mon Sep 1 14:44:00 2025 +0200
UNOMI-878: Enhanced Cluster-Aware Task Scheduling Service with Improved
Developer Experience and Persistence Integration
---
.../org/apache/unomi/api/ExecutionContext.java | 98 ++++++++
.../api/services/ExecutionContextManager.java | 78 ++++++
.../geonames/services/GeonamesServiceImpl.java | 266 +++++++++++++--------
.../services/impl/GroovyActionsServiceImpl.java | 46 ++--
.../unomi/schema/impl/SchemaServiceImpl.java | 44 ++--
.../resources/OSGI-INF/blueprint/blueprint.xml | 4 +-
.../actions/MergeProfilesOnPropertyAction.java | 72 ++++--
.../services/impl/cluster/ClusterServiceImpl.java | 20 +-
.../impl/definitions/DefinitionsServiceImpl.java | 26 +-
.../services/impl/profiles/ProfileServiceImpl.java | 38 +--
.../services/impl/rules/RulesServiceImpl.java | 32 ++-
.../services/impl/scope/ScopeServiceImpl.java | 18 +-
.../services/impl/segments/SegmentServiceImpl.java | 26 +-
.../resources/OSGI-INF/blueprint/blueprint.xml | 2 -
.../commands/scheduler/CancelTaskCommand.java | 42 ++++
.../shell/commands/scheduler/ListTasksCommand.java | 135 +++++++++++
.../commands/scheduler/PurgeTasksCommand.java | 93 +++++++
.../shell/commands/scheduler/RetryTaskCommand.java | 47 ++++
.../commands/scheduler/SetExecutorNodeCommand.java | 54 +++++
.../shell/commands/scheduler/ShowTaskCommand.java | 99 ++++++++
20 files changed, 1020 insertions(+), 220 deletions(-)
diff --git a/api/src/main/java/org/apache/unomi/api/ExecutionContext.java
b/api/src/main/java/org/apache/unomi/api/ExecutionContext.java
new file mode 100644
index 000000000..1fcf5a7ba
--- /dev/null
+++ b/api/src/main/java/org/apache/unomi/api/ExecutionContext.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.api;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Stack;
+
+/**
+ * Represents the execution context for operations in Unomi, including
security and tenant information.
+ */
+public class ExecutionContext {
+ public static final String SYSTEM_TENANT = "system";
+
+ private String tenantId;
+ private Set<String> roles = new HashSet<>();
+ private Set<String> permissions = new HashSet<>();
+ private Stack<String> tenantStack = new Stack<>();
+ private boolean isSystem = false;
+
+ public ExecutionContext(String tenantId, Set<String> roles, Set<String>
permissions) {
+ this.tenantId = tenantId;
+ if (tenantId != null && tenantId.equals(SYSTEM_TENANT)) {
+ this.isSystem = true;
+ }
+ if (roles != null) {
+ this.roles.addAll(roles);
+ }
+ if (permissions != null) {
+ this.permissions.addAll(permissions);
+ }
+ }
+
+ public static ExecutionContext systemContext() {
+ ExecutionContext context = new ExecutionContext(SYSTEM_TENANT, null,
null);
+ context.isSystem = true;
+ return context;
+ }
+
+ public String getTenantId() {
+ return tenantId;
+ }
+
+ public Set<String> getRoles() {
+ return new HashSet<>(roles);
+ }
+
+ public Set<String> getPermissions() {
+ return new HashSet<>(permissions);
+ }
+
+ public boolean isSystem() {
+ return isSystem;
+ }
+
+ public void setTenant(String tenantId) {
+ tenantStack.push(this.tenantId);
+ this.tenantId = tenantId;
+ }
+
+ public void restorePreviousTenant() {
+ if (!tenantStack.isEmpty()) {
+ this.tenantId = tenantStack.pop();
+ }
+ }
+
+ public void validateAccess(String operation) {
+ if (isSystem) {
+ return;
+ }
+
+ if (!hasPermission(operation)) {
+ throw new SecurityException("Access denied: Missing permission for
operation " + operation + " for tenant " + tenantId + " and roles " + roles);
+ }
+ }
+
+ public boolean hasPermission(String permission) {
+ return isSystem || permissions.contains(permission);
+ }
+
+ public boolean hasRole(String role) {
+ return isSystem || roles.contains(role);
+ }
+}
\ No newline at end of file
diff --git
a/api/src/main/java/org/apache/unomi/api/services/ExecutionContextManager.java
b/api/src/main/java/org/apache/unomi/api/services/ExecutionContextManager.java
new file mode 100644
index 000000000..da1ab18a0
--- /dev/null
+++
b/api/src/main/java/org/apache/unomi/api/services/ExecutionContextManager.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.api.services;
+
+import org.apache.unomi.api.ExecutionContext;
+
+import java.util.function.Supplier;
+
+/**
+ * Service interface for managing execution contexts in Unomi.
+ */
+public interface ExecutionContextManager {
+
+ /**
+ * Gets the current execution context.
+ * @return the current execution context
+ */
+ ExecutionContext getCurrentContext();
+
+ /**
+ * Sets the current execution context.
+ * @param context the context to set as current
+ */
+ void setCurrentContext(ExecutionContext context);
+
+ /**
+ * Executes an operation as the system user.
+ * @param operation the operation to execute
+ * @param <T> the return type of the operation
+ * @return the result of the operation
+ */
+ <T> T executeAsSystem(Supplier<T> operation);
+
+ /**
+ * Executes an operation as the system user without return value.
+ * @param operation the operation to execute
+ */
+ void executeAsSystem(Runnable operation);
+
+ /**
+ * Executes an operation as a specific tenant.
+ * This method creates a tenant context, executes the operation, and
ensures proper cleanup.
+ * @param tenantId the ID of the tenant to execute as
+ * @param operation the operation to execute
+ * @param <T> the return type of the operation
+ * @return the result of the operation
+ */
+ <T> T executeAsTenant(String tenantId, Supplier<T> operation);
+
+ /**
+ * Executes an operation as a specific tenant without return value.
+ * This method creates a tenant context, executes the operation, and
ensures proper cleanup.
+ * @param tenantId the ID of the tenant to execute as
+ * @param operation the operation to execute
+ */
+ void executeAsTenant(String tenantId, Runnable operation);
+
+ /**
+ * Creates a new execution context for the given tenant.
+ * @param tenantId the tenant ID
+ * @return the created execution context
+ */
+ ExecutionContext createContext(String tenantId);
+}
diff --git
a/extensions/geonames/services/src/main/java/org/apache/unomi/geonames/services/GeonamesServiceImpl.java
b/extensions/geonames/services/src/main/java/org/apache/unomi/geonames/services/GeonamesServiceImpl.java
index a19725035..f84c19415 100644
---
a/extensions/geonames/services/src/main/java/org/apache/unomi/geonames/services/GeonamesServiceImpl.java
+++
b/extensions/geonames/services/src/main/java/org/apache/unomi/geonames/services/GeonamesServiceImpl.java
@@ -17,12 +17,15 @@
package org.apache.unomi.geonames.services;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.unomi.api.PartialList;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.services.DefinitionsService;
+import org.apache.unomi.api.services.ExecutionContextManager;
import org.apache.unomi.api.services.SchedulerService;
+import org.apache.unomi.api.tasks.TaskExecutor;
+import org.apache.unomi.api.tasks.TaskExecutor.TaskStatusCallback;
+import org.apache.unomi.api.tasks.ScheduledTask;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +46,7 @@ public class GeonamesServiceImpl implements GeonamesService {
private DefinitionsService definitionsService;
private PersistenceService persistenceService;
private SchedulerService schedulerService;
+ private ExecutionContextManager contextManager;
private String pathToGeonamesDatabase;
private Boolean forceDbImport;
@@ -64,6 +68,10 @@ public class GeonamesServiceImpl implements GeonamesService {
this.schedulerService = schedulerService;
}
+ public void setContextManager(ExecutionContextManager contextManager) {
+ this.contextManager = contextManager;
+ }
+
public void setPathToGeonamesDatabase(String pathToGeonamesDatabase) {
this.pathToGeonamesDatabase = pathToGeonamesDatabase;
}
@@ -79,47 +87,99 @@ public class GeonamesServiceImpl implements GeonamesService
{
public void stop() {
}
- public void importDatabase() {
- if (!persistenceService.createIndex(GeonameEntry.ITEM_TYPE)) {
- if (forceDbImport) {
- persistenceService.removeIndex(GeonameEntry.ITEM_TYPE);
- persistenceService.createIndex(GeonameEntry.ITEM_TYPE);
- LOGGER.info("Geonames index removed and recreated");
- } else if
(persistenceService.getAllItemsCount(GeonameEntry.ITEM_TYPE) > 0) {
- return;
- }
- } else {
- LOGGER.info("Geonames index created");
+ private static class GeonamesImportTaskExecutor implements TaskExecutor {
+ private final GeonamesServiceImpl service;
+ private final File databaseFile;
+
+ public GeonamesImportTaskExecutor(GeonamesServiceImpl service, File
databaseFile) {
+ this.service = service;
+ this.databaseFile = databaseFile;
}
- if (pathToGeonamesDatabase == null) {
- LOGGER.info("No geonames DB provided");
- return;
+ @Override
+ public String getTaskType() {
+ return "geonames-import";
}
- final File f = new File(pathToGeonamesDatabase);
- if (f.exists()) {
- schedulerService.getSharedScheduleExecutorService().schedule(new
TimerTask() {
- @Override
- public void run() {
- importGeoNameDatabase(f);
+
+ @Override
+ public void execute(ScheduledTask task, TaskStatusCallback
statusCallback) throws Exception {
+ service.contextManager.executeAsSystem(() -> {
+ try {
+ service.importGeoNameDatabase(databaseFile);
+ statusCallback.complete();
+ } catch (Exception e) {
+ LOGGER.error("Error importing geoname database", e);
+ statusCallback.fail(e.getMessage());
}
- }, refreshDbInterval, TimeUnit.MILLISECONDS);
+ return null;
+ });
+ }
+ }
+
+ private static class GeonamesImportRetryTaskExecutor implements
TaskExecutor {
+ private final GeonamesServiceImpl service;
+ private final File databaseFile;
+
+ public GeonamesImportRetryTaskExecutor(GeonamesServiceImpl service,
File databaseFile) {
+ this.service = service;
+ this.databaseFile = databaseFile;
+ }
+
+ @Override
+ public String getTaskType() {
+ return "geonames-import-retry";
}
+
+ @Override
+ public void execute(ScheduledTask task, TaskStatusCallback
statusCallback) throws Exception {
+ service.importGeoNameDatabase(databaseFile);
+ statusCallback.complete();
+ }
+ }
+
+ public void importDatabase() {
+ contextManager.executeAsSystem(() -> {
+ if (!persistenceService.createIndex(GeonameEntry.ITEM_TYPE)) {
+ if (forceDbImport) {
+ persistenceService.removeIndex(GeonameEntry.ITEM_TYPE);
+ persistenceService.createIndex(GeonameEntry.ITEM_TYPE);
+ LOGGER.info("Geonames index removed and recreated");
+ } else if
(persistenceService.getAllItemsCount(GeonameEntry.ITEM_TYPE) > 0) {
+ return;
+ }
+ } else {
+ LOGGER.info("Geonames index created");
+ }
+
+ if (pathToGeonamesDatabase == null) {
+ LOGGER.info("No geonames DB provided");
+ return;
+ }
+ final File f = new File(pathToGeonamesDatabase);
+ if (f.exists()) {
+ schedulerService.newTask("geonames-import")
+ .withInitialDelay(refreshDbInterval, TimeUnit.MILLISECONDS)
+ .asOneShot()
+ .withExecutor(new GeonamesImportTaskExecutor(this, f))
+ .nonPersistent()
+ .schedule();
+ }
+ });
}
private void importGeoNameDatabase(final File f) {
Map<String,Map<String,Object>> typeMappings =
persistenceService.getPropertiesMapping(GeonameEntry.ITEM_TYPE);
if (typeMappings == null || typeMappings.size() == 0) {
LOGGER.warn("Type mappings for type {} are not yet installed,
delaying import until they are ready!", GeonameEntry.ITEM_TYPE);
- schedulerService.getSharedScheduleExecutorService().schedule(new
TimerTask() {
- @Override
- public void run() {
- importGeoNameDatabase(f);
- }
- }, refreshDbInterval, TimeUnit.MILLISECONDS);
+ schedulerService.newTask("geonames-import-retry")
+ .withInitialDelay(refreshDbInterval, TimeUnit.MILLISECONDS)
+ .asOneShot()
+ .withExecutor(new GeonamesImportRetryTaskExecutor(this, f))
+ .nonPersistent()
+ .schedule();
return;
} else {
- // let's check that the mappings are correct
+ // @TODO: let's check that the mappings are correct
}
try {
@@ -229,48 +289,50 @@ public class GeonamesServiceImpl implements
GeonamesService {
}
public List<GeonameEntry> reverseGeoCode(String lat, String lon) {
- List<Condition> l = new ArrayList<Condition>();
- Condition andCondition = new Condition();
-
andCondition.setConditionType(definitionsService.getConditionType("booleanCondition"));
- andCondition.setParameter("operator", "and");
- andCondition.setParameter("subConditions", l);
-
-
- Condition geoLocation = new Condition();
-
geoLocation.setConditionType(definitionsService.getConditionType("geoLocationByPointSessionCondition"));
- geoLocation.setParameter("type", "circle");
- geoLocation.setParameter("circleLatitude", Double.parseDouble(lat));
- geoLocation.setParameter("circleLongitude", Double.parseDouble(lon));
- geoLocation.setParameter("distance", GEOCODING_MAX_DISTANCE);
- l.add(geoLocation);
-
- l.add(getPropertyCondition("featureCode", "propertyValues",
CITIES_FEATURE_CODES, "in"));
-
- PartialList<GeonameEntry> list =
persistenceService.query(andCondition, "geo:location:" + lat + ":" + lon,
GeonameEntry.class, 0, 1);
- if (!list.getList().isEmpty()) {
- return getHierarchy(list.getList().get(0));
- }
- return Collections.emptyList();
+ return contextManager.executeAsSystem(() -> {
+ List<Condition> l = new ArrayList<Condition>();
+ Condition andCondition = new Condition();
+
andCondition.setConditionType(definitionsService.getConditionType("booleanCondition"));
+ andCondition.setParameter("operator", "and");
+ andCondition.setParameter("subConditions", l);
+
+ Condition geoLocation = new Condition();
+
geoLocation.setConditionType(definitionsService.getConditionType("geoLocationByPointSessionCondition"));
+ geoLocation.setParameter("type", "circle");
+ geoLocation.setParameter("circleLatitude",
Double.parseDouble(lat));
+ geoLocation.setParameter("circleLongitude",
Double.parseDouble(lon));
+ geoLocation.setParameter("distance", GEOCODING_MAX_DISTANCE);
+ l.add(geoLocation);
+
+ l.add(getPropertyCondition("featureCode", "propertyValues",
CITIES_FEATURE_CODES, "in"));
+
+ PartialList<GeonameEntry> list =
persistenceService.query(andCondition, "geo:location:" + lat + ":" + lon,
GeonameEntry.class, 0, 1);
+ if (!list.getList().isEmpty()) {
+ return getHierarchy(list.getList().get(0));
+ }
+ return Collections.emptyList();
+ });
}
-
public PartialList<GeonameEntry> getChildrenEntries(List<String> items,
int offset, int size) {
- Condition andCondition = getItemsInChildrenQuery(items,
CITIES_FEATURE_CODES);
- Condition featureCodeCondition = ((List<Condition>)
andCondition.getParameter("subConditions")).get(0);
- int level = items.size();
-
- featureCodeCondition.setParameter("propertyValues",
ORDERED_FEATURES.get(level));
- PartialList<GeonameEntry> r = persistenceService.query(andCondition,
null, GeonameEntry.class, offset, size);
- while (r.size() == 0 && level < ORDERED_FEATURES.size() - 1) {
- level++;
+ return contextManager.executeAsSystem(() -> {
+ Condition andCondition = getItemsInChildrenQuery(items,
CITIES_FEATURE_CODES);
+ Condition featureCodeCondition = ((List<Condition>)
andCondition.getParameter("subConditions")).get(0);
+ int level = items.size();
+
featureCodeCondition.setParameter("propertyValues",
ORDERED_FEATURES.get(level));
- r = persistenceService.query(andCondition, null,
GeonameEntry.class, offset, size);
- }
- return r;
+ PartialList<GeonameEntry> r =
persistenceService.query(andCondition, null, GeonameEntry.class, offset, size);
+ while (r.size() == 0 && level < ORDERED_FEATURES.size() - 1) {
+ level++;
+ featureCodeCondition.setParameter("propertyValues",
ORDERED_FEATURES.get(level));
+ r = persistenceService.query(andCondition, null,
GeonameEntry.class, offset, size);
+ }
+ return r;
+ });
}
public PartialList<GeonameEntry> getChildrenCities(List<String> items, int
offset, int size) {
- return persistenceService.query(getItemsInChildrenQuery(items,
CITIES_FEATURE_CODES), null, GeonameEntry.class, offset, size);
+ return contextManager.executeAsSystem(() ->
persistenceService.query(getItemsInChildrenQuery(items, CITIES_FEATURE_CODES),
null, GeonameEntry.class, offset, size));
}
private Condition getItemsInChildrenQuery(List<String> items, List<String>
featureCodes) {
@@ -296,45 +358,47 @@ public class GeonamesServiceImpl implements
GeonamesService {
}
public List<GeonameEntry> getCapitalEntries(String itemId) {
- GeonameEntry entry = persistenceService.load(itemId,
GeonameEntry.class);
- List<String> featureCodes;
-
- List<Condition> l = new ArrayList<Condition>();
- Condition andCondition = new Condition();
-
andCondition.setConditionType(definitionsService.getConditionType("booleanCondition"));
- andCondition.setParameter("operator", "and");
- andCondition.setParameter("subConditions", l);
+ return contextManager.executeAsSystem(() -> {
+ GeonameEntry entry = persistenceService.load(itemId,
GeonameEntry.class);
+ List<String> featureCodes;
+
+ List<Condition> l = new ArrayList<Condition>();
+ Condition andCondition = new Condition();
+
andCondition.setConditionType(definitionsService.getConditionType("booleanCondition"));
+ andCondition.setParameter("operator", "and");
+ andCondition.setParameter("subConditions", l);
+
+ l.add(getPropertyCondition("countryCode", "propertyValue",
entry.getCountryCode(), "equals"));
+
+ if (COUNTRY_FEATURE_CODES.contains(entry.getFeatureCode())) {
+ featureCodes = Arrays.asList("PPLC");
+ } else if (ADM1_FEATURE_CODES.contains(entry.getFeatureCode())) {
+ featureCodes = Arrays.asList("PPLA", "PPLC");
+ l.add(getPropertyCondition("admin1Code", "propertyValue",
entry.getAdmin1Code(), "equals"));
+ } else if (ADM2_FEATURE_CODES.contains(entry.getFeatureCode())) {
+ featureCodes = Arrays.asList("PPLA2", "PPLA", "PPLC");
+ l.add(getPropertyCondition("admin1Code", "propertyValue",
entry.getAdmin1Code(), "equals"));
+ l.add(getPropertyCondition("admin2Code", "propertyValue",
entry.getAdmin2Code(), "equals"));
+ } else {
+ return Collections.emptyList();
+ }
- l.add(getPropertyCondition("countryCode", "propertyValue",
entry.getCountryCode(), "equals"));
-
- if (COUNTRY_FEATURE_CODES.contains(entry.getFeatureCode())) {
- featureCodes = Arrays.asList("PPLC");
- } else if (ADM1_FEATURE_CODES.contains(entry.getFeatureCode())) {
- featureCodes = Arrays.asList("PPLA", "PPLC");
- l.add(getPropertyCondition("admin1Code", "propertyValue",
entry.getAdmin1Code(), "equals"));
- } else if (ADM2_FEATURE_CODES.contains(entry.getFeatureCode())) {
- featureCodes = Arrays.asList("PPLA2", "PPLA", "PPLC");
- l.add(getPropertyCondition("admin1Code", "propertyValue",
entry.getAdmin1Code(), "equals"));
- l.add(getPropertyCondition("admin2Code", "propertyValue",
entry.getAdmin2Code(), "equals"));
- } else {
+ Condition featureCodeCondition = new Condition();
+
featureCodeCondition.setConditionType(definitionsService.getConditionType("sessionPropertyCondition"));
+ featureCodeCondition.setParameter("propertyName", "featureCode");
+ featureCodeCondition.setParameter("propertyValues", featureCodes);
+ featureCodeCondition.setParameter("comparisonOperator", "in");
+ l.add(featureCodeCondition);
+ List<GeonameEntry> entries =
persistenceService.query(andCondition, null, GeonameEntry.class);
+ if (entries.size() == 0) {
+ featureCodeCondition.setParameter("propertyValues",
CITIES_FEATURE_CODES);
+ entries = persistenceService.query(andCondition,
"population:desc", GeonameEntry.class, 0, 1).getList();
+ }
+ if (entries.size() > 0) {
+ return getHierarchy(entries.get(0));
+ }
return Collections.emptyList();
- }
-
- Condition featureCodeCondition = new Condition();
-
featureCodeCondition.setConditionType(definitionsService.getConditionType("sessionPropertyCondition"));
- featureCodeCondition.setParameter("propertyName", "featureCode");
- featureCodeCondition.setParameter("propertyValues", featureCodes);
- featureCodeCondition.setParameter("comparisonOperator", "in");
- l.add(featureCodeCondition);
- List<GeonameEntry> entries = persistenceService.query(andCondition,
null, GeonameEntry.class);
- if (entries.size() == 0) {
- featureCodeCondition.setParameter("propertyValues",
CITIES_FEATURE_CODES);
- entries = persistenceService.query(andCondition,
"population:desc", GeonameEntry.class, 0, 1).getList();
- }
- if (entries.size() > 0) {
- return getHierarchy(entries.get(0));
- }
- return Collections.emptyList();
+ });
}
private Condition getPropertyCondition(String name, String
propertyValueField, Object value, String operator) {
diff --git
a/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
b/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
index 3ad70b69b..fee5f3a89 100644
---
a/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
+++
b/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
@@ -36,7 +36,10 @@ import org.codehaus.groovy.control.CompilerConfiguration;
import org.codehaus.groovy.control.customizers.ImportCustomizer;
import org.osgi.framework.BundleContext;
import org.osgi.framework.wiring.BundleWiring;
-import org.osgi.service.component.annotations.*;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
@@ -46,12 +49,10 @@ import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
-import java.util.Set;
-
import java.util.Map;
+import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -74,7 +75,6 @@ public class GroovyActionsServiceImpl implements
GroovyActionsService {
private BundleContext bundleContext;
private GroovyScriptEngine groovyScriptEngine;
private CompilerConfiguration compilerConfiguration;
- private ScheduledFuture<?> scheduledFuture;
private final Object compilationLock = new Object();
private GroovyShell compilationShell;
@@ -88,6 +88,7 @@ public class GroovyActionsServiceImpl implements
GroovyActionsService {
private DefinitionsService definitionsService;
private PersistenceService persistenceService;
private SchedulerService schedulerService;
+ private String refreshGroovyActionsTaskId;
private GroovyActionsServiceConfig config;
@Reference
@@ -103,9 +104,12 @@ public class GroovyActionsServiceImpl implements
GroovyActionsService {
@Reference
public void setSchedulerService(SchedulerService schedulerService) {
this.schedulerService = schedulerService;
- }
-
+ if (schedulerService != null) {
+ LOGGER.info("SchedulerService was set after GroovyActionsService
initialization, initializing scheduled tasks now");
+ initializeTimers();
+ }
+ }
@Activate
public void start(GroovyActionsServiceConfig config, BundleContext
bundleContext) {
@@ -130,15 +134,19 @@ public class GroovyActionsServiceImpl implements
GroovyActionsService {
// PRE-COMPILE ALL SCRIPTS AT STARTUP (no on-demand compilation)
preloadAllScripts();
- initializeTimers();
+ if (schedulerService != null) {
+ initializeTimers();
+ } else {
+ LOGGER.warn("SchedulerService not available during
GroovyActionsService initialization. Scheduled tasks will not be registered.
They will be registered when SchedulerService becomes available.");
+ }
LOGGER.info("Groovy action service initialized with {} scripts",
scriptMetadataCache.size());
}
@Deactivate
public void onDestroy() {
LOGGER.debug("onDestroy Method called");
- if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
- scheduledFuture.cancel(true);
+ if (schedulerService != null && refreshGroovyActionsTaskId != null) {
+ schedulerService.cancelTask(refreshGroovyActionsTaskId);
}
}
@@ -342,7 +350,7 @@ public class GroovyActionsServiceImpl implements
GroovyActionsService {
ScriptMetadata removedMetadata =
scriptMetadataCache.remove(actionName);
persistenceService.remove(actionName, GroovyAction.class);
-
+
// Clean up error tracking to prevent memory leak
loggedRefreshErrors.remove(actionName);
@@ -445,29 +453,29 @@ public class GroovyActionsServiceImpl implements
GroovyActionsService {
}
errorCount++;
-
+
// Prevent log spam for repeated compilation errors during
refresh
String errorMessage = e.getMessage();
Set<String> scriptErrors = loggedRefreshErrors.get(actionName);
-
+
if (scriptErrors == null ||
!scriptErrors.contains(errorMessage)) {
newErrorCount++;
LOGGER.error("Failed to refresh script: {}", actionName,
e);
-
+
// Prevent memory leak by limiting tracked errors before
adding new entries
if (scriptErrors == null && loggedRefreshErrors.size() >=
MAX_LOGGED_ERRORS) {
// Remove one random entry to make space (simple
eviction)
String firstKey =
loggedRefreshErrors.keySet().iterator().next();
loggedRefreshErrors.remove(firstKey);
}
-
+
// Now safely add the error
if (scriptErrors == null) {
scriptErrors = ConcurrentHashMap.newKeySet();
loggedRefreshErrors.put(actionName, scriptErrors);
}
scriptErrors.add(errorMessage);
-
+
LOGGER.warn("Keeping existing version of script {} due to
compilation error", actionName);
}
@@ -502,7 +510,9 @@ public class GroovyActionsServiceImpl implements
GroovyActionsService {
refreshGroovyActions();
}
};
- scheduledFuture =
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(task, 0,
config.services_groovy_actions_refresh_interval(),
- TimeUnit.MILLISECONDS);
+ if (this.refreshGroovyActionsTaskId != null) {
+ schedulerService.cancelTask(this.refreshGroovyActionsTaskId);
+ }
+ this.refreshGroovyActionsTaskId =
schedulerService.createRecurringTask("refreshGroovyActions",
config.services_groovy_actions_refresh_interval(), TimeUnit.MILLISECONDS, task,
false).getItemId();
}
}
diff --git
a/extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java
b/extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java
index da7efeac2..f7e68b783 100644
---
a/extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java
+++
b/extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java
@@ -27,7 +27,9 @@ import com.networknt.schema.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.unomi.api.Item;
+import org.apache.unomi.api.services.SchedulerService;
import org.apache.unomi.api.services.ScopeService;
+import org.apache.unomi.api.tasks.ScheduledTask;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.schema.api.JsonSchemaWrapper;
import org.apache.unomi.schema.api.SchemaService;
@@ -47,10 +49,8 @@ import java.util.stream.Collectors;
public class SchemaServiceImpl implements SchemaService {
private static final String URI =
"https://json-schema.org/draft/2019-09/schema";
-
private static final Logger LOGGER =
LoggerFactory.getLogger(SchemaServiceImpl.class.getName());
private static final String TARGET_EVENTS = "events";
-
private static final String GENERIC_ERROR_KEY = "error";
ObjectMapper objectMapper = new ObjectMapper();
@@ -67,18 +67,12 @@ public class SchemaServiceImpl implements SchemaService {
* Available extensions indexed by key:schema URI to be extended, value:
list of schema extension URIs
*/
private ConcurrentMap<String, Set<String>> extensions = new
ConcurrentHashMap<>();
-
private Integer jsonSchemaRefreshInterval = 1000;
- private ScheduledFuture<?> scheduledFuture;
-
private PersistenceService persistenceService;
private ScopeService scopeService;
-
private JsonSchemaFactory jsonSchemaFactory;
-
- // TODO UNOMI-572: when fixing UNOMI-572 please remove the usage of the
custom ScheduledExecutorService and re-introduce the Unomi Scheduler Service
- private ScheduledExecutorService scheduler;
- //private SchedulerService schedulerService;
+ private SchedulerService schedulerService;
+ private String refreshJSONSchemasTaskId;
@Override
public boolean isValid(String data, String schemaId) {
@@ -378,14 +372,22 @@ public class SchemaServiceImpl implements SchemaService {
TimerTask task = new TimerTask() {
@Override
public void run() {
- try {
- refreshJSONSchemas();
- } catch (Exception e) {
- LOGGER.error("Unexpected error while refreshing JSON
Schemas", e);
- }
+ try {
+ refreshJSONSchemas();
+ } catch (Exception e) {
+ LOGGER.error("Unexpected error while refreshing JSON Schemas",
e);
+ }
}
};
- scheduledFuture = scheduler.scheduleWithFixedDelay(task, 0,
jsonSchemaRefreshInterval, TimeUnit.MILLISECONDS);
+ this.resetTimers();
+ this.refreshJSONSchemasTaskId =
schedulerService.createRecurringTask("refreshJSONSchemas",
jsonSchemaRefreshInterval, TimeUnit.MILLISECONDS, task, false).getItemId();
+ }
+
+ private void resetTimers() {
+ if (this.refreshJSONSchemasTaskId != null) {
+ schedulerService.cancelTask(this.refreshJSONSchemasTaskId);
+ this.refreshJSONSchemasTaskId = null;
+ }
}
private void initJsonSchemaFactory() {
@@ -414,17 +416,13 @@ public class SchemaServiceImpl implements SchemaService {
}
public void init() {
- scheduler = Executors.newSingleThreadScheduledExecutor();
- initJsonSchemaFactory();
- initTimers();
+ this.initJsonSchemaFactory();
+ this.initTimers();
LOGGER.info("Schema service initialized.");
}
public void destroy() {
- scheduledFuture.cancel(true);
- if (scheduler != null) {
- scheduler.shutdown();
- }
+ this.resetTimers();
LOGGER.info("Schema service shutdown.");
}
diff --git
a/extensions/json-schema/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/extensions/json-schema/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 9998ad930..399f81944 100644
---
a/extensions/json-schema/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++
b/extensions/json-schema/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -29,13 +29,13 @@
<reference id="scopeService"
interface="org.apache.unomi.api.services.ScopeService"/>
<reference id="persistenceService"
interface="org.apache.unomi.persistence.spi.PersistenceService"/>
- <!--reference id="schedulerService"
interface="org.apache.unomi.api.services.SchedulerService"/-->
+ <reference id="schedulerService"
interface="org.apache.unomi.api.services.SchedulerService"/>
<bean id="schemaServiceImpl"
class="org.apache.unomi.schema.impl.SchemaServiceImpl" init-method="init"
destroy-method="destroy">
<property name="persistenceService" ref="persistenceService"/>
<property name="scopeService" ref="scopeService"/>
- <!--property name="schedulerService" ref="schedulerService"/-->
+ <property name="schedulerService" ref="schedulerService"/>
<property name="jsonSchemaRefreshInterval"
value="${json.schema.refresh.interval}"/>
</bean>
<service id="schemaService" ref="schemaServiceImpl"
interface="org.apache.unomi.schema.api.SchemaService"/>
diff --git
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index a333490ab..e89c2e8f5 100644
---
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -26,6 +26,8 @@ import org.apache.unomi.api.actions.Action;
import org.apache.unomi.api.actions.ActionExecutor;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.services.*;
+import org.apache.unomi.api.tasks.ScheduledTask;
+import org.apache.unomi.api.tasks.TaskExecutor;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,27 +156,63 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
}
private void reassignPersistedBrowsingDatasAsync(boolean
anonymousBrowsing, List<String> mergedProfileIds, String masterProfileId) {
- schedulerService.getSharedScheduleExecutorService().schedule(new
TimerTask() {
+ // Register task executor for data reassignment
+ String taskType = "merge-profiles-reassign-data";
+
+ // Create a reusable executor that can handle the parameters
+ TaskExecutor mergeProfilesReassignDataExecutor = new TaskExecutor() {
@Override
- public void run() {
- if (!anonymousBrowsing) {
- Condition profileIdsCondition = new
Condition(definitionsService.getConditionType("eventPropertyCondition"));
-
profileIdsCondition.setParameter("propertyName","profileId");
-
profileIdsCondition.setParameter("comparisonOperator","in");
- profileIdsCondition.setParameter("propertyValues",
mergedProfileIds);
-
- String[] scripts = new String[]{"updateProfileId"};
- Map<String, Object>[] scriptParams = new
Map[]{Collections.singletonMap("profileId", masterProfileId)};
- Condition[] conditions = new
Condition[]{profileIdsCondition};
-
- persistenceService.updateWithQueryAndStoredScript(new
Class[]{Session.class, Event.class}, scripts, scriptParams, conditions, false);
- } else {
- for (String mergedProfileId : mergedProfileIds) {
- privacyService.anonymizeBrowsingData(mergedProfileId);
+ public String getTaskType() {
+ return taskType;
+ }
+
+ @Override
+ public void execute(ScheduledTask task,
TaskExecutor.TaskStatusCallback callback) {
+ try {
+ Map<String, Object> parameters = task.getParameters();
+ boolean isAnonymousBrowsing = (boolean)
parameters.get("anonymousBrowsing");
+ @SuppressWarnings("unchecked")
+ List<String> profilesIds = (List<String>)
parameters.get("mergedProfileIds");
+ String masterProfile = (String)
parameters.get("masterProfileId");
+
+ if (!anonymousBrowsing) {
+ Condition profileIdsCondition = new
Condition(definitionsService.getConditionType("eventPropertyCondition"));
+
profileIdsCondition.setParameter("propertyName","profileId");
+
profileIdsCondition.setParameter("comparisonOperator","in");
+ profileIdsCondition.setParameter("propertyValues",
mergedProfileIds);
+
+ String[] scripts = new String[]{"updateProfileId"};
+ Map<String, Object>[] scriptParams = new
Map[]{Collections.singletonMap("profileId", masterProfileId)};
+ Condition[] conditions = new
Condition[]{profileIdsCondition};
+
+ persistenceService.updateWithQueryAndStoredScript(new
Class[]{Session.class, Event.class}, scripts, scriptParams, conditions, false);
+ } else {
+ for (String mergedProfileId : mergedProfileIds) {
+
privacyService.anonymizeBrowsingData(mergedProfileId);
+ }
}
+
+ callback.complete();
+ } catch (Exception e) {
+ LOGGER.error("Error while reassigning profile data", e);
+ callback.fail(e.getMessage());
}
}
- }, 1000, TimeUnit.MILLISECONDS);
+ };
+
+ // Register the executor
+
schedulerService.registerTaskExecutor(mergeProfilesReassignDataExecutor);
+
+ // Create a one-shot task for async data reassignment
+ schedulerService.newTask(taskType)
+ .withParameters(Map.of(
+ "anonymousBrowsing", anonymousBrowsing,
+ "mergedProfileIds", mergedProfileIds,
+ "masterProfileId", masterProfileId
+ ))
+ .withInitialDelay(1000, TimeUnit.MILLISECONDS)
+ .asOneShot()
+ .schedule();
}
private void reassignCurrentBrowsingData(Event event, List<Profile>
existingMergedProfiles, boolean forceEventProfileAsMaster, String
mergePropName, String mergePropValue) {
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
index f741d8d5f..9802426e7 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
@@ -58,8 +58,8 @@ public class ClusterServiceImpl implements ClusterService {
private volatile List<ClusterNode> cachedClusterNodes =
Collections.emptyList();
private BundleWatcher bundleWatcher;
- private String updateSystemStatsTaskId;
- private String cleanupStaleNodesTaskId;
+ private String clusterNodeStatisticsUpdateTaskId;
+ private String clusterStaleNodesCleanupTaskId;
/**
* Max time to wait for persistence service (in milliseconds)
@@ -206,12 +206,10 @@ public class ClusterServiceImpl implements ClusterService
{
* This method can be called later if schedulerService wasn't available
during init.
*/
public void initializeScheduledTasks() {
- /* Wait for PR UNOMI-878 to reactivate that code
if (schedulerService == null) {
LOGGER.error("Cannot initialize scheduled tasks: SchedulerService
is not set");
return;
}
- */
// Schedule regular updates of the node statistics
TimerTask statisticsTask = new TimerTask() {
@@ -224,7 +222,7 @@ public class ClusterServiceImpl implements ClusterService {
}
}
};
- updateSystemStatsTaskId =
schedulerService.createRecurringTask("clusterNodeStatisticsUpdate",
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS, statisticsTask,
false).getItemId();
+ this.clusterNodeStatisticsUpdateTaskId =
schedulerService.createRecurringTask("clusterNodeStatisticsUpdate",
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS, statisticsTask,
false).getItemId();
// Schedule cleanup of stale nodes
TimerTask cleanupTask = new TimerTask() {
@@ -237,7 +235,7 @@ public class ClusterServiceImpl implements ClusterService {
}
}
};
- cleanupStaleNodesTaskId =
schedulerService.createRecurringTask("clusterStaleNodesCleanup", 60000,
TimeUnit.MILLISECONDS, cleanupTask, false).getItemId();
+ this.clusterStaleNodesCleanupTaskId =
schedulerService.createRecurringTask("clusterStaleNodesCleanup", 60000,
TimeUnit.MILLISECONDS, cleanupTask, false).getItemId();
LOGGER.info("Cluster service scheduled tasks initialized");
}
@@ -247,11 +245,13 @@ public class ClusterServiceImpl implements ClusterService
{
shutdownNow = true;
// Cancel scheduled tasks
- if (updateSystemStatsTaskId != null) {
- schedulerService.cancelTask(updateSystemStatsTaskId);
+ if (schedulerService != null && clusterNodeStatisticsUpdateTaskId !=
null) {
+ schedulerService.cancelTask(clusterNodeStatisticsUpdateTaskId);
+ clusterStaleNodesCleanupTaskId = null;
}
- if (cleanupStaleNodesTaskId != null) {
- schedulerService.cancelTask(cleanupStaleNodesTaskId);
+ if (schedulerService != null && clusterStaleNodesCleanupTaskId !=
null) {
+ schedulerService.cancelTask(clusterStaleNodesCleanupTaskId);
+ clusterStaleNodesCleanupTaskId = null;
}
// Remove node from persistence service
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java
index 8fa7e1e68..ff7babc2c 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java
@@ -26,9 +26,9 @@ import org.apache.unomi.api.conditions.ConditionType;
import org.apache.unomi.api.services.DefinitionsService;
import org.apache.unomi.api.services.SchedulerService;
import org.apache.unomi.api.utils.ConditionBuilder;
+import org.apache.unomi.api.utils.ParserHelper;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.apache.unomi.persistence.spi.PersistenceService;
-import org.apache.unomi.api.utils.ParserHelper;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
@@ -38,17 +38,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimerTask;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -70,6 +60,8 @@ public class DefinitionsServiceImpl implements
DefinitionsService, SynchronousBu
private ConditionBuilder conditionBuilder;
private BundleContext bundleContext;
+ private String reloadTypesTaskId;
+
public DefinitionsServiceImpl() {
}
@@ -114,10 +106,17 @@ public class DefinitionsServiceImpl implements
DefinitionsService, SynchronousBu
reloadTypes(false);
}
};
-
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 10000,
definitionsRefreshInterval, TimeUnit.MILLISECONDS);
+ this.resetTypeReloads();
+ this.reloadTypesTaskId =
schedulerService.createRecurringTask("reloadTypes", definitionsRefreshInterval,
TimeUnit.MILLISECONDS, task, false).getItemId();
LOGGER.info("Scheduled task for condition type loading each 10s");
}
+ private void resetTypeReloads() {
+ if (this.reloadTypesTaskId != null) {
+ schedulerService.cancelTask(this.reloadTypesTaskId);
+ }
+ }
+
public void reloadTypes(boolean refresh) {
try {
if (refresh) {
@@ -190,6 +189,7 @@ public class DefinitionsServiceImpl implements
DefinitionsService, SynchronousBu
}
public void preDestroy() {
+ this.resetTypeReloads();
bundleContext.removeBundleListener(this);
LOGGER.info("Definitions service shutdown.");
}
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index 7dd5db65f..78ca73959 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -183,14 +183,15 @@ public class ProfileServiceImpl implements
ProfileService, SynchronousBundleList
private Integer purgeSessionExistTime = 0;
private Integer purgeEventExistTime = 0;
private Integer purgeProfileInterval = 0;
- private TimerTask purgeTask = null;
private long propertiesRefreshInterval = 10000;
private PropertyTypes propertyTypes;
- private TimerTask propertyTypeLoadTask = null;
private boolean forceRefreshOnSave = false;
+ private String propertyTypeLoadTaskId;
+ private String purgeProfilesTaskId;
+
public ProfileServiceImpl() {
LOGGER.info("Initializing profile service...");
}
@@ -241,12 +242,8 @@ public class ProfileServiceImpl implements ProfileService,
SynchronousBundleList
}
public void preDestroy() {
- if (purgeTask != null) {
- purgeTask.cancel();
- }
- if (propertyTypeLoadTask != null) {
- propertyTypeLoadTask.cancel();
- }
+ this.resetProfilesPurgeTask();
+ this.resetPropertyTypeLoadTask();
bundleContext.removeBundleListener(this);
LOGGER.info("Profile service shutdown.");
}
@@ -304,14 +301,21 @@ public class ProfileServiceImpl implements
ProfileService, SynchronousBundleList
}
private void schedulePropertyTypeLoad() {
- propertyTypeLoadTask = new TimerTask() {
+ TimerTask task = new TimerTask() {
@Override
public void run() {
reloadPropertyTypes(false);
}
};
-
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(propertyTypeLoadTask,
10000, propertiesRefreshInterval, TimeUnit.MILLISECONDS);
- LOGGER.info("Scheduled task for property type loading each 10s");
+ this.resetPropertyTypeLoadTask();
+ this.propertyTypeLoadTaskId =
schedulerService.createRecurringTask("propertyTypeLoad",
propertiesRefreshInterval, TimeUnit.MILLISECONDS, task, false).getItemId();
+ LOGGER.info("Scheduled task for property type loading each {}ms",
propertiesRefreshInterval);
+ }
+
+ private void resetPropertyTypeLoadTask() {
+ if (this.propertyTypeLoadTaskId != null) {
+ schedulerService.cancelTask(this.propertyTypeLoadTaskId);
+ }
}
public void reloadPropertyTypes(boolean refresh) {
@@ -410,7 +414,7 @@ public class ProfileServiceImpl implements ProfileService,
SynchronousBundleList
LOGGER.info("Purge: Event items created since more than {}
days, will be purged", purgeEventExistTime);
}
- purgeTask = new TimerTask() {
+ TimerTask task = new TimerTask() {
@Override
public void run() {
try {
@@ -429,8 +433,8 @@ public class ProfileServiceImpl implements ProfileService,
SynchronousBundleList
}
}
};
-
-
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(purgeTask, 1,
purgeProfileInterval, TimeUnit.DAYS);
+ this.resetProfilesPurgeTask();
+ this.purgeProfilesTaskId =
schedulerService.createRecurringTask("profilesPurge", purgeProfileInterval,
TimeUnit.DAYS, task, false).getItemId();
LOGGER.info("Purge: purge scheduled with an interval of {} days",
purgeProfileInterval);
} else {
@@ -438,6 +442,12 @@ public class ProfileServiceImpl implements ProfileService,
SynchronousBundleList
}
}
+ private void resetProfilesPurgeTask() {
+ if (this.purgeProfilesTaskId != null) {
+ schedulerService.cancelTask(this.purgeProfilesTaskId);
+ this.purgeProfilesTaskId = null;
+ }
+ }
public long getAllProfilesCount() {
return persistenceService.getAllItemsCount(Profile.ITEM_TYPE);
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
index fe3fe8fb0..71cc75eb5 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
@@ -71,6 +71,9 @@ public class RulesServiceImpl implements RulesService,
EventListenerService, Syn
private Map<String, Set<Rule>> rulesByEventType = new HashMap<>();
private Boolean optimizedRulesActivated = true;
+ private String refreshRulesTaskId;
+ private String syncRuleStatisticsTaskId;
+
public void setBundleContext(BundleContext bundleContext) {
this.bundleContext = bundleContext;
}
@@ -133,11 +136,12 @@ public class RulesServiceImpl implements RulesService,
EventListenerService, Syn
bundleContext.addBundleListener(this);
- initializeTimers();
+ this.initializeTimers();
LOGGER.info("Rule service initialized.");
}
public void preDestroy() {
+ this.resetTimers();
bundleContext.removeBundleListener(this);
LOGGER.info("Rule service shutdown.");
}
@@ -488,25 +492,37 @@ public class RulesServiceImpl implements RulesService,
EventListenerService, Syn
}
private void initializeTimers() {
+ this.resetTimers();
TimerTask task = new TimerTask() {
@Override
public void run() {
refreshRules();
}
};
-
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(task, 0,
rulesRefreshInterval, TimeUnit.MILLISECONDS);
+ this.refreshRulesTaskId =
schedulerService.createRecurringTask("refreshRules", rulesRefreshInterval,
TimeUnit.MILLISECONDS, task, false).getItemId();
TimerTask statisticsTask = new TimerTask() {
@Override
public void run() {
- try {
- syncRuleStatistics();
- } catch (Throwable t) {
- LOGGER.error("Error synching rule statistics between
memory and persistence back-end", t);
- }
+ try {
+ syncRuleStatistics();
+ } catch (Throwable t) {
+ LOGGER.error("Error synching rule statistics between memory
and persistence back-end", t);
+ }
}
};
-
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(statisticsTask,
0, rulesStatisticsRefreshInterval, TimeUnit.MILLISECONDS);
+ this.syncRuleStatisticsTaskId =
schedulerService.createRecurringTask("syncRuleStatistics",
rulesStatisticsRefreshInterval, TimeUnit.MILLISECONDS, statisticsTask,
false).getItemId();
+ }
+
+ private void resetTimers() {
+ if (refreshRulesTaskId != null) {
+ schedulerService.cancelTask(refreshRulesTaskId);
+ refreshRulesTaskId = null;
+ }
+ if (syncRuleStatisticsTaskId != null) {
+ schedulerService.cancelTask(syncRuleStatisticsTaskId);
+ syncRuleStatisticsTaskId = null;
+ }
}
public void bundleChanged(BundleEvent event) {
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/scope/ScopeServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/scope/ScopeServiceImpl.java
index 701109d9f..179392a33 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/scope/ScopeServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/scope/ScopeServiceImpl.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -41,7 +40,7 @@ public class ScopeServiceImpl implements ScopeService {
private ConcurrentMap<String, Scope> scopes = new ConcurrentHashMap<>();
- private ScheduledFuture<?> scheduledFuture;
+ private String refreshScopesTaskId;
public void setPersistenceService(PersistenceService persistenceService) {
this.persistenceService = persistenceService;
@@ -56,11 +55,11 @@ public class ScopeServiceImpl implements ScopeService {
}
public void postConstruct() {
- initializeTimers();
+ this.initializeTimers();
}
public void preDestroy() {
- scheduledFuture.cancel(true);
+ this.resetTimers();
}
@Override
@@ -90,8 +89,15 @@ public class ScopeServiceImpl implements ScopeService {
refreshScopes();
}
};
- scheduledFuture = schedulerService.getScheduleExecutorService()
- .scheduleWithFixedDelay(task, 0, scopesRefreshInterval,
TimeUnit.MILLISECONDS);
+ this.resetTimers();
+ this.refreshScopesTaskId =
schedulerService.createRecurringTask("refreshScopes", scopesRefreshInterval,
TimeUnit.MILLISECONDS, task, false).getItemId();
+ }
+
+ private void resetTimers() {
+ if (refreshScopesTaskId != null) {
+ schedulerService.cancelTask(refreshScopesTaskId);
+ refreshScopesTaskId = null;
+ }
}
private void refreshScopes() {
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 1bc8730f4..453203bcd 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.unomi.api.*;
import org.apache.unomi.api.actions.Action;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.conditions.ConditionType;
+import org.apache.unomi.api.exceptions.BadSegmentConditionException;
import org.apache.unomi.api.query.Query;
import org.apache.unomi.api.rules.Rule;
import org.apache.unomi.api.segments.*;
@@ -32,12 +33,11 @@ import org.apache.unomi.api.services.RulesService;
import org.apache.unomi.api.services.SchedulerService;
import org.apache.unomi.api.services.SegmentService;
import org.apache.unomi.api.utils.ConditionBuilder;
+import org.apache.unomi.api.utils.ParserHelper;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
import org.apache.unomi.services.impl.AbstractServiceImpl;
import org.apache.unomi.services.impl.scheduler.SchedulerServiceImpl;
-import org.apache.unomi.api.utils.ParserHelper;
-import org.apache.unomi.api.exceptions.BadSegmentConditionException;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
@@ -83,6 +83,8 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
private int maximumIdsQueryCount = 5000;
private boolean pastEventsDisablePartitions = false;
private int dailyDateExprEvaluationHourUtc = 5;
+ private String recalculatePastEventConditionsTaskId;
+ private String refreshSegmentAndScoringDefinitionsTaskId;
public SegmentServiceImpl() {
LOGGER.info("Initializing segment service...");
@@ -155,11 +157,12 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
}
}
bundleContext.addBundleListener(this);
- initializeTimer();
+ this.initializeTimer();
LOGGER.info("Segment service initialized.");
}
public void preDestroy() {
+ this.resetTimers();
bundleContext.removeBundleListener(this);
LOGGER.info("Segment service shutdown.");
}
@@ -1196,7 +1199,7 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
}
private void initializeTimer() {
-
+ this.resetTimers();
TimerTask task = new TimerTask() {
@Override
public void run() {
@@ -1214,7 +1217,7 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
long period = TimeUnit.DAYS.toSeconds(taskExecutionPeriod);
LOGGER.info("daily recalculation job for segments and scoring that
contains date relative conditions will run at fixed rate, " +
"initialDelay={}, taskExecutionPeriod={} in seconds",
initialDelay, period);
-
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task,
initialDelay, period, TimeUnit.SECONDS);
+ this.recalculatePastEventConditionsTaskId =
schedulerService.createRecurringTask("recalculatePastEventConditions", period,
TimeUnit.SECONDS, task, false).getItemId();
task = new TimerTask() {
@Override
@@ -1227,7 +1230,18 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
}
}
};
-
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 0,
segmentRefreshInterval, TimeUnit.MILLISECONDS);
+ this.refreshSegmentAndScoringDefinitionsTaskId =
schedulerService.createRecurringTask("refreshSegmentAndScoringDefinitions",
segmentRefreshInterval, TimeUnit.MILLISECONDS, task, false).getItemId();
+ }
+
+ private void resetTimers() {
+ if (this.recalculatePastEventConditionsTaskId != null) {
+
schedulerService.cancelTask(this.recalculatePastEventConditionsTaskId);
+ this.recalculatePastEventConditionsTaskId = null;
+ }
+ if (this.refreshSegmentAndScoringDefinitionsTaskId != null) {
+
schedulerService.cancelTask(this.refreshSegmentAndScoringDefinitionsTaskId);
+ this.refreshSegmentAndScoringDefinitionsTaskId = null;
+ }
}
public void setTaskExecutionPeriod(long taskExecutionPeriod) {
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 4e23f5cef..7abef9ac4 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -367,9 +367,7 @@
<property name="nodeId" value="${cluster.nodeId}"/>
<property name="nodeStatisticsUpdateFrequency"
value="${cluster.nodeStatisticsUpdateFrequency}"/>
<property name="bundleWatcher" ref="bundleWatcher"/>
- <!-- Wait for UNOMI-878 to be available to activate that
<property name="schedulerService" ref="schedulerServiceImpl"/>
- -->
</bean>
<service id="clusterService" ref="clusterServiceImpl"
interface="org.apache.unomi.api.services.ClusterService"/>
diff --git
a/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/CancelTaskCommand.java
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/CancelTaskCommand.java
new file mode 100644
index 000000000..2cff5ff5c
--- /dev/null
+++
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/CancelTaskCommand.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.shell.commands.scheduler;
+
+import org.apache.karaf.shell.api.action.Action;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Reference;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.unomi.api.services.SchedulerService;
+
+@Command(scope = "unomi", name = "task-cancel", description = "Cancels a
scheduled task")
+@Service
+public class CancelTaskCommand implements Action {
+
+ @Reference
+ private SchedulerService schedulerService;
+
+ @Argument(index = 0, name = "taskId", description = "The ID of the task to
cancel", required = true)
+ private String taskId;
+
+ @Override
+ public Object execute() throws Exception {
+ schedulerService.cancelTask(taskId);
+ System.out.println("Task " + taskId + " has been cancelled
successfully.");
+ return null;
+ }
+}
diff --git
a/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/ListTasksCommand.java
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/ListTasksCommand.java
new file mode 100644
index 000000000..c6c71a6e1
--- /dev/null
+++
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/ListTasksCommand.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.shell.commands.scheduler;
+
+import org.apache.karaf.shell.api.action.Action;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Reference;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.support.table.Col;
+import org.apache.karaf.shell.support.table.ShellTable;
+import org.apache.unomi.api.PartialList;
+import org.apache.unomi.api.services.SchedulerService;
+import org.apache.unomi.api.tasks.ScheduledTask;
+
+import java.text.SimpleDateFormat;
+import java.util.List;
+
+@Command(scope = "unomi", name = "task-list", description = "Lists scheduled
tasks")
+@Service
+public class ListTasksCommand implements Action {
+
+ @Reference
+ private SchedulerService schedulerService;
+
+ @Option(name = "-s", aliases = "--status", description = "Filter by task
status (SCHEDULED, RUNNING, COMPLETED, FAILED, CANCELLED, CRASHED)", required =
false)
+ private String status;
+
+ @Option(name = "-t", aliases = "--type", description = "Filter by task
type", required = false)
+ private String type;
+
+ @Option(name = "--limit", description = "Maximum number of tasks to
display (default: 50)", required = false)
+ private int limit = 50;
+
+ @Override
+ public Object execute() throws Exception {
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");
+ ShellTable table = new ShellTable();
+
+ // Configure table columns
+ table.column(new Col("ID").maxSize(36));
+ table.column(new Col("Type").maxSize(30));
+ table.column(new Col("Status").maxSize(10));
+ table.column(new Col("Next Run").maxSize(19));
+ table.column(new Col("Last Run").maxSize(19));
+ table.column(new Col("Failures").alignRight());
+ table.column(new Col("Successes").alignRight());
+ table.column(new Col("Total Exec").alignRight());
+ table.column(new Col("Persistent").maxSize(10));
+
+ // Get tasks based on filters
+ List<ScheduledTask> tasks;
+ if (status != null) {
+ try {
+ ScheduledTask.TaskStatus taskStatus =
ScheduledTask.TaskStatus.valueOf(status.toUpperCase());
+ // Get persistent tasks
+ PartialList<ScheduledTask> filteredTasks =
schedulerService.getTasksByStatus(taskStatus, 0, limit, null);
+ tasks = filteredTasks.getList();
+ // Add memory tasks with matching status
+ List<ScheduledTask> memoryTasks =
schedulerService.getMemoryTasks();
+ for (ScheduledTask task : memoryTasks) {
+ if (task.getStatus() == taskStatus) {
+ tasks.add(task);
+ }
+ }
+ } catch (IllegalArgumentException e) {
+ System.err.println("Invalid status: " + status);
+ return null;
+ }
+ } else if (type != null) {
+ // Get persistent tasks
+ PartialList<ScheduledTask> filteredTasks =
schedulerService.getTasksByType(type, 0, limit, null);
+ tasks = filteredTasks.getList();
+ // Add memory tasks with matching type
+ List<ScheduledTask> memoryTasks =
schedulerService.getMemoryTasks();
+ for (ScheduledTask task : memoryTasks) {
+ if (task.getTaskType().equals(type)) {
+ tasks.add(task);
+ }
+ }
+ } else {
+ // Get all tasks from both storage and memory
+ tasks = schedulerService.getAllTasks();
+ if (tasks.size() > limit) {
+ tasks = tasks.subList(0, limit);
+ }
+ }
+
+ // Add rows to table
+ for (ScheduledTask task : tasks) {
+ int totalExecutions = task.getSuccessCount() +
task.getFailureCount();
+
+ table.addRow().addContent(
+ task.getItemId(),
+ task.getTaskType(),
+ task.getStatus(),
+ task.getNextScheduledExecution() != null ?
dateFormat.format(task.getNextScheduledExecution()) : "-",
+ task.getLastExecutionDate() != null ?
dateFormat.format(task.getLastExecutionDate()) : "-",
+ task.getFailureCount(),
+ task.getSuccessCount(),
+ totalExecutions,
+ task.isPersistent() ? "Storage" : "Memory"
+ );
+ }
+
+ table.print(System.out);
+
+ if (tasks.isEmpty()) {
+ System.out.println("No tasks found.");
+ } else {
+ int persistentCount = (int)
tasks.stream().filter(ScheduledTask::isPersistent).count();
+ int memoryCount = tasks.size() - persistentCount;
+ System.out.println("\nShowing " + tasks.size() + " task(s) (" +
+ persistentCount + " in storage, " + memoryCount + " in
memory)" +
+ (status != null ? " with status " + status : "") +
+ (type != null ? " of type " + type : ""));
+ }
+
+ return null;
+ }
+}
diff --git
a/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/PurgeTasksCommand.java
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/PurgeTasksCommand.java
new file mode 100644
index 000000000..3cd357cc5
--- /dev/null
+++
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/PurgeTasksCommand.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.shell.commands.scheduler;
+
+import org.apache.karaf.shell.api.action.Action;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Reference;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.api.console.Session;
+import org.apache.unomi.api.services.SchedulerService;
+import org.apache.unomi.api.tasks.ScheduledTask;
+
+import java.util.Calendar;
+import java.util.Date;
+
+@Command(scope = "unomi", name = "task-purge", description = "Purges old
completed tasks")
+@Service
+public class PurgeTasksCommand implements Action {
+
+ @Reference
+ private SchedulerService schedulerService;
+
+ @Reference
+ private Session session;
+
+ @Option(name = "-d", aliases = "--days", description = "Number of days to
keep completed tasks (default: 7)", required = false)
+ private int daysToKeep = 7;
+
+ @Option(name = "-f", aliases = "--force", description = "Skip confirmation
prompt", required = false)
+ private boolean force = false;
+
+ @Override
+ public Object execute() throws Exception {
+ if (!force) {
+ String response = session.readLine(
+ "This will permanently delete all completed tasks older than "
+ daysToKeep + " days. Continue? (y/n): ",
+ null
+ );
+ if (!"y".equalsIgnoreCase(response != null ? response.trim() :
"n")) {
+ System.out.println("Operation cancelled.");
+ return null;
+ }
+ }
+
+ // Calculate cutoff date
+ Calendar cal = Calendar.getInstance();
+ cal.add(Calendar.DAY_OF_MONTH, -daysToKeep);
+ Date cutoffDate = cal.getTime();
+
+ // Get completed tasks
+ int offset = 0;
+ int batchSize = 100;
+ int purgedCount = 0;
+
+ while (true) {
+ var tasks =
schedulerService.getTasksByStatus(ScheduledTask.TaskStatus.COMPLETED, offset,
batchSize, null);
+ if (tasks.getList().isEmpty()) {
+ break;
+ }
+
+ // Cancel old completed tasks
+ for (ScheduledTask task : tasks.getList()) {
+ if (task.getLastExecutionDate() != null &&
task.getLastExecutionDate().before(cutoffDate)) {
+ schedulerService.cancelTask(task.getItemId());
+ purgedCount++;
+ }
+ }
+
+ if (tasks.getList().size() < batchSize) {
+ break;
+ }
+ offset += batchSize;
+ }
+
+ System.out.println("Successfully purged " + purgedCount + " completed
tasks older than " + daysToKeep + " days.");
+ return null;
+ }
+}
diff --git
a/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/RetryTaskCommand.java
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/RetryTaskCommand.java
new file mode 100644
index 000000000..bcdf26e03
--- /dev/null
+++
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/RetryTaskCommand.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.shell.commands.scheduler;
+
+import org.apache.karaf.shell.api.action.Action;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Reference;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.unomi.api.services.SchedulerService;
+
+@Command(scope = "unomi", name = "task-retry", description = "Retries a failed
task")
+@Service
+public class RetryTaskCommand implements Action {
+
+ @Reference
+ private SchedulerService schedulerService;
+
+ @Argument(index = 0, name = "taskId", description = "The ID of the task to
retry", required = true)
+ private String taskId;
+
+ @Option(name = "-r", aliases = "--reset", description = "Reset failure
count before retrying")
+ private boolean resetFailureCount = false;
+
+ @Override
+ public Object execute() throws Exception {
+ schedulerService.retryTask(taskId, resetFailureCount);
+ System.out.println("Task " + taskId + " has been queued for retry" +
+ (resetFailureCount ? " with reset failure count." : "."));
+ return null;
+ }
+}
diff --git
a/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/SetExecutorNodeCommand.java
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/SetExecutorNodeCommand.java
new file mode 100644
index 000000000..c52e70873
--- /dev/null
+++
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/SetExecutorNodeCommand.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.shell.commands.scheduler;
+
+import org.apache.karaf.shell.api.action.Action;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Reference;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.unomi.api.services.SchedulerService;
+
+@Command(scope = "unomi", name = "task-executor", description = "Shows or
changes task executor status for this node")
+@Service
+public class SetExecutorNodeCommand implements Action {
+
+ @Reference
+ private SchedulerService schedulerService;
+
+ @Argument(index = 0, name = "enable", description = "Enable (true) or
disable (false) task execution", required = false)
+ private String enable;
+
+ @Override
+ public Object execute() throws Exception {
+ if (enable == null) {
+ // Just show current status
+ System.out.println("Task executor status: " +
+ (schedulerService.isExecutorNode() ? "ENABLED" : "DISABLED"));
+ System.out.println("Node ID: " + schedulerService.getNodeId());
+ return null;
+ }
+
+ boolean shouldEnable = Boolean.parseBoolean(enable);
+ // Note: This assumes there's a setExecutorNode method. If not
available, we'll need to modify the service.
+ // schedulerService.setExecutorNode(shouldEnable);
+
+ System.out.println("Task executor has been " + (shouldEnable ?
"ENABLED" : "DISABLED") +
+ " for node " + schedulerService.getNodeId());
+ return null;
+ }
+}
diff --git
a/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/ShowTaskCommand.java
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/ShowTaskCommand.java
new file mode 100644
index 000000000..e59172162
--- /dev/null
+++
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/ShowTaskCommand.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.shell.commands.scheduler;
+
+import org.apache.karaf.shell.api.action.Action;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Reference;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.unomi.api.services.SchedulerService;
+import org.apache.unomi.api.tasks.ScheduledTask;
+
+import java.text.SimpleDateFormat;
+import java.util.Map;
+
+@Command(scope = "unomi", name = "task-show", description = "Shows detailed
information about a task")
+@Service
+public class ShowTaskCommand implements Action {
+
+ @Reference
+ private SchedulerService schedulerService;
+
+ @Argument(index = 0, name = "taskId", description = "The ID of the task to
show", required = true)
+ private String taskId;
+
+ @Override
+ public Object execute() throws Exception {
+ ScheduledTask task = schedulerService.getTask(taskId);
+ if (task == null) {
+ System.err.println("Task not found: " + taskId);
+ return null;
+ }
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");
+
+ // Print basic information
+ System.out.println("Task Details");
+ System.out.println("-----------");
+ System.out.println("ID: " + task.getItemId());
+ System.out.println("Type: " + task.getTaskType());
+ System.out.println("Status: " + task.getStatus());
+ System.out.println("Persistent: " + task.isPersistent());
+ System.out.println("Parallel Execution: " +
task.isAllowParallelExecution());
+ System.out.println("Fixed Rate: " + task.isFixedRate());
+ System.out.println("One Shot: " + task.isOneShot());
+
+ // Print timing information
+ if (task.getNextScheduledExecution() != null) {
+ System.out.println("Next Run: " +
dateFormat.format(task.getNextScheduledExecution()));
+ }
+ if (task.getLastExecutionDate() != null) {
+ System.out.println("Last Run: " +
dateFormat.format(task.getLastExecutionDate()));
+ }
+ System.out.println("Initial Delay: " + task.getInitialDelay() + " " +
task.getTimeUnit());
+ System.out.println("Period: " + task.getPeriod() + " " +
task.getTimeUnit());
+
+ // Print execution information
+ System.out.println("Failure Count: " + task.getFailureCount());
+ if (task.getLastError() != null) {
+ System.out.println("Last Error: " + task.getLastError());
+ }
+
+ // Print parameters if any
+ Map<String, Object> parameters = task.getParameters();
+ if (parameters != null && !parameters.isEmpty()) {
+ System.out.println("\nParameters");
+ System.out.println("----------");
+ for (Map.Entry<String, Object> entry : parameters.entrySet()) {
+ System.out.println(entry.getKey() + ": " + entry.getValue());
+ }
+ }
+
+ // Print checkpoint data if any
+ Map<String, Object> checkpointData = task.getCheckpointData();
+ if (checkpointData != null && !checkpointData.isEmpty()) {
+ System.out.println("\nCheckpoint Data");
+ System.out.println("--------------");
+ for (Map.Entry<String, Object> entry : checkpointData.entrySet()) {
+ System.out.println(entry.getKey() + ": " + entry.getValue());
+ }
+ }
+
+ return null;
+ }
+}