This is an automated email from the ASF dual-hosted git repository.

shuber pushed a commit to branch UNOMI-878-new-scheduler
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/UNOMI-878-new-scheduler by 
this push:
     new 957f0259e UNOMI-878: Enhanced Cluster-Aware Task Scheduling Service 
with Improved Developer Experience and Persistence Integration
957f0259e is described below

commit 957f0259e3c65668a94e34bea2592c49da660452
Author: Serge Huber <[email protected]>
AuthorDate: Mon Sep 1 14:44:00 2025 +0200

    UNOMI-878: Enhanced Cluster-Aware Task Scheduling Service with Improved 
Developer Experience and Persistence Integration
---
 .../org/apache/unomi/api/ExecutionContext.java     |  98 ++++++++
 .../api/services/ExecutionContextManager.java      |  78 ++++++
 .../geonames/services/GeonamesServiceImpl.java     | 266 +++++++++++++--------
 .../services/impl/GroovyActionsServiceImpl.java    |  46 ++--
 .../unomi/schema/impl/SchemaServiceImpl.java       |  44 ++--
 .../resources/OSGI-INF/blueprint/blueprint.xml     |   4 +-
 .../actions/MergeProfilesOnPropertyAction.java     |  72 ++++--
 .../services/impl/cluster/ClusterServiceImpl.java  |  20 +-
 .../impl/definitions/DefinitionsServiceImpl.java   |  26 +-
 .../services/impl/profiles/ProfileServiceImpl.java |  38 +--
 .../services/impl/rules/RulesServiceImpl.java      |  32 ++-
 .../services/impl/scope/ScopeServiceImpl.java      |  18 +-
 .../services/impl/segments/SegmentServiceImpl.java |  26 +-
 .../resources/OSGI-INF/blueprint/blueprint.xml     |   2 -
 .../commands/scheduler/CancelTaskCommand.java      |  42 ++++
 .../shell/commands/scheduler/ListTasksCommand.java | 135 +++++++++++
 .../commands/scheduler/PurgeTasksCommand.java      |  93 +++++++
 .../shell/commands/scheduler/RetryTaskCommand.java |  47 ++++
 .../commands/scheduler/SetExecutorNodeCommand.java |  54 +++++
 .../shell/commands/scheduler/ShowTaskCommand.java  |  99 ++++++++
 20 files changed, 1020 insertions(+), 220 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/ExecutionContext.java 
b/api/src/main/java/org/apache/unomi/api/ExecutionContext.java
new file mode 100644
index 000000000..1fcf5a7ba
--- /dev/null
+++ b/api/src/main/java/org/apache/unomi/api/ExecutionContext.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.api;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Stack;
+
+/**
+ * Represents the execution context for operations in Unomi, including 
security and tenant information.
+ */
+public class ExecutionContext {
+    public static final String SYSTEM_TENANT = "system";
+    
+    private String tenantId;
+    private Set<String> roles = new HashSet<>();
+    private Set<String> permissions = new HashSet<>();
+    private Stack<String> tenantStack = new Stack<>();
+    private boolean isSystem = false;
+    
+    public ExecutionContext(String tenantId, Set<String> roles, Set<String> 
permissions) {
+        this.tenantId = tenantId;
+        if (tenantId != null && tenantId.equals(SYSTEM_TENANT)) {
+            this.isSystem = true;
+        }
+        if (roles != null) {
+            this.roles.addAll(roles);
+        }
+        if (permissions != null) {
+            this.permissions.addAll(permissions);
+        }
+    }
+    
+    public static ExecutionContext systemContext() {
+        ExecutionContext context = new ExecutionContext(SYSTEM_TENANT, null, 
null);
+        context.isSystem = true;
+        return context;
+    }
+    
+    public String getTenantId() {
+        return tenantId;
+    }
+    
+    public Set<String> getRoles() {
+        return new HashSet<>(roles);
+    }
+    
+    public Set<String> getPermissions() {
+        return new HashSet<>(permissions);
+    }
+    
+    public boolean isSystem() {
+        return isSystem;
+    }
+    
+    public void setTenant(String tenantId) {
+        tenantStack.push(this.tenantId);
+        this.tenantId = tenantId;
+    }
+    
+    public void restorePreviousTenant() {
+        if (!tenantStack.isEmpty()) {
+            this.tenantId = tenantStack.pop();
+        }
+    }
+    
+    public void validateAccess(String operation) {
+        if (isSystem) {
+            return;
+        }
+        
+        if (!hasPermission(operation)) {
+            throw new SecurityException("Access denied: Missing permission for 
operation " + operation + " for tenant " + tenantId + " and roles " + roles);
+        }
+    }
+    
+    public boolean hasPermission(String permission) {
+        return isSystem || permissions.contains(permission);
+    }
+    
+    public boolean hasRole(String role) {
+        return isSystem || roles.contains(role);
+    }
+} 
\ No newline at end of file
diff --git 
a/api/src/main/java/org/apache/unomi/api/services/ExecutionContextManager.java 
b/api/src/main/java/org/apache/unomi/api/services/ExecutionContextManager.java
new file mode 100644
index 000000000..da1ab18a0
--- /dev/null
+++ 
b/api/src/main/java/org/apache/unomi/api/services/ExecutionContextManager.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.api.services;
+
+import org.apache.unomi.api.ExecutionContext;
+
+import java.util.function.Supplier;
+
+/**
+ * Service interface for managing execution contexts in Unomi.
+ */
+public interface ExecutionContextManager {
+
+    /**
+     * Gets the current execution context.
+     * @return the current execution context
+     */
+    ExecutionContext getCurrentContext();
+
+    /**
+     * Sets the current execution context.
+     * @param context the context to set as current
+     */
+    void setCurrentContext(ExecutionContext context);
+
+    /**
+     * Executes an operation as the system user.
+     * @param operation the operation to execute
+     * @param <T> the return type of the operation
+     * @return the result of the operation
+     */
+    <T> T executeAsSystem(Supplier<T> operation);
+
+    /**
+     * Executes an operation as the system user without return value.
+     * @param operation the operation to execute
+     */
+    void executeAsSystem(Runnable operation);
+
+    /**
+     * Executes an operation as a specific tenant.
+     * This method creates a tenant context, executes the operation, and 
ensures proper cleanup.
+     * @param tenantId the ID of the tenant to execute as
+     * @param operation the operation to execute
+     * @param <T> the return type of the operation
+     * @return the result of the operation
+     */
+    <T> T executeAsTenant(String tenantId, Supplier<T> operation);
+
+    /**
+     * Executes an operation as a specific tenant without return value.
+     * This method creates a tenant context, executes the operation, and 
ensures proper cleanup.
+     * @param tenantId the ID of the tenant to execute as
+     * @param operation the operation to execute
+     */
+    void executeAsTenant(String tenantId, Runnable operation);
+
+    /**
+     * Creates a new execution context for the given tenant.
+     * @param tenantId the tenant ID
+     * @return the created execution context
+     */
+    ExecutionContext createContext(String tenantId);
+}
diff --git 
a/extensions/geonames/services/src/main/java/org/apache/unomi/geonames/services/GeonamesServiceImpl.java
 
b/extensions/geonames/services/src/main/java/org/apache/unomi/geonames/services/GeonamesServiceImpl.java
index a19725035..f84c19415 100644
--- 
a/extensions/geonames/services/src/main/java/org/apache/unomi/geonames/services/GeonamesServiceImpl.java
+++ 
b/extensions/geonames/services/src/main/java/org/apache/unomi/geonames/services/GeonamesServiceImpl.java
@@ -17,12 +17,15 @@
 
 package org.apache.unomi.geonames.services;
 
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.unomi.api.PartialList;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.services.DefinitionsService;
+import org.apache.unomi.api.services.ExecutionContextManager;
 import org.apache.unomi.api.services.SchedulerService;
+import org.apache.unomi.api.tasks.TaskExecutor;
+import org.apache.unomi.api.tasks.TaskExecutor.TaskStatusCallback;
+import org.apache.unomi.api.tasks.ScheduledTask;
 import org.apache.unomi.persistence.spi.PersistenceService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +46,7 @@ public class GeonamesServiceImpl implements GeonamesService {
     private DefinitionsService definitionsService;
     private PersistenceService persistenceService;
     private SchedulerService schedulerService;
+    private ExecutionContextManager contextManager;
 
     private String pathToGeonamesDatabase;
     private Boolean forceDbImport;
@@ -64,6 +68,10 @@ public class GeonamesServiceImpl implements GeonamesService {
         this.schedulerService = schedulerService;
     }
 
+    public void setContextManager(ExecutionContextManager contextManager) {
+        this.contextManager = contextManager;
+    }
+
     public void setPathToGeonamesDatabase(String pathToGeonamesDatabase) {
         this.pathToGeonamesDatabase = pathToGeonamesDatabase;
     }
@@ -79,47 +87,99 @@ public class GeonamesServiceImpl implements GeonamesService 
{
     public void stop() {
     }
 
-    public void importDatabase() {
-        if (!persistenceService.createIndex(GeonameEntry.ITEM_TYPE)) {
-            if (forceDbImport) {
-                persistenceService.removeIndex(GeonameEntry.ITEM_TYPE);
-                persistenceService.createIndex(GeonameEntry.ITEM_TYPE);
-                LOGGER.info("Geonames index removed and recreated");
-            } else if 
(persistenceService.getAllItemsCount(GeonameEntry.ITEM_TYPE) > 0) {
-                return;
-            }
-        } else {
-            LOGGER.info("Geonames index created");
+    private static class GeonamesImportTaskExecutor implements TaskExecutor {
+        private final GeonamesServiceImpl service;
+        private final File databaseFile;
+
+        public GeonamesImportTaskExecutor(GeonamesServiceImpl service, File 
databaseFile) {
+            this.service = service;
+            this.databaseFile = databaseFile;
         }
 
-        if (pathToGeonamesDatabase == null) {
-            LOGGER.info("No geonames DB provided");
-            return;
+        @Override
+        public String getTaskType() {
+            return "geonames-import";
         }
-        final File f = new File(pathToGeonamesDatabase);
-        if (f.exists()) {
-            schedulerService.getSharedScheduleExecutorService().schedule(new 
TimerTask() {
-                @Override
-                public void run() {
-                    importGeoNameDatabase(f);
+
+        @Override
+        public void execute(ScheduledTask task, TaskStatusCallback 
statusCallback) throws Exception {
+            service.contextManager.executeAsSystem(() -> {
+                try {
+                    service.importGeoNameDatabase(databaseFile);
+                    statusCallback.complete();
+                } catch (Exception e) {
+                    LOGGER.error("Error importing geoname database", e);
+                    statusCallback.fail(e.getMessage());
                 }
-            }, refreshDbInterval, TimeUnit.MILLISECONDS);
+                return null;
+            });
+        }
+    }
+
+    private static class GeonamesImportRetryTaskExecutor implements 
TaskExecutor {
+        private final GeonamesServiceImpl service;
+        private final File databaseFile;
+
+        public GeonamesImportRetryTaskExecutor(GeonamesServiceImpl service, 
File databaseFile) {
+            this.service = service;
+            this.databaseFile = databaseFile;
+        }
+
+        @Override
+        public String getTaskType() {
+            return "geonames-import-retry";
         }
+
+        @Override
+        public void execute(ScheduledTask task, TaskStatusCallback 
statusCallback) throws Exception {
+            service.importGeoNameDatabase(databaseFile);
+            statusCallback.complete();
+        }
+    }
+
+    public void importDatabase() {
+        contextManager.executeAsSystem(() -> {
+            if (!persistenceService.createIndex(GeonameEntry.ITEM_TYPE)) {
+                if (forceDbImport) {
+                    persistenceService.removeIndex(GeonameEntry.ITEM_TYPE);
+                    persistenceService.createIndex(GeonameEntry.ITEM_TYPE);
+                    LOGGER.info("Geonames index removed and recreated");
+                } else if 
(persistenceService.getAllItemsCount(GeonameEntry.ITEM_TYPE) > 0) {
+                    return;
+                }
+            } else {
+                LOGGER.info("Geonames index created");
+            }
+
+            if (pathToGeonamesDatabase == null) {
+                LOGGER.info("No geonames DB provided");
+                return;
+            }
+            final File f = new File(pathToGeonamesDatabase);
+            if (f.exists()) {
+                schedulerService.newTask("geonames-import")
+                    .withInitialDelay(refreshDbInterval, TimeUnit.MILLISECONDS)
+                    .asOneShot()
+                    .withExecutor(new GeonamesImportTaskExecutor(this, f))
+                    .nonPersistent()
+                    .schedule();
+            }
+        });
     }
 
     private void importGeoNameDatabase(final File f) {
         Map<String,Map<String,Object>> typeMappings = 
persistenceService.getPropertiesMapping(GeonameEntry.ITEM_TYPE);
         if (typeMappings == null || typeMappings.size() == 0) {
             LOGGER.warn("Type mappings for type {} are not yet installed, 
delaying import until they are ready!", GeonameEntry.ITEM_TYPE);
-            schedulerService.getSharedScheduleExecutorService().schedule(new 
TimerTask() {
-                @Override
-                public void run() {
-                    importGeoNameDatabase(f);
-                }
-            }, refreshDbInterval, TimeUnit.MILLISECONDS);
+            schedulerService.newTask("geonames-import-retry")
+                .withInitialDelay(refreshDbInterval, TimeUnit.MILLISECONDS)
+                .asOneShot()
+                .withExecutor(new GeonamesImportRetryTaskExecutor(this, f))
+                .nonPersistent()
+                .schedule();
             return;
         } else {
-            // let's check that the mappings are correct
+            // @TODO: let's check that the mappings are correct
         }
         try {
 
@@ -229,48 +289,50 @@ public class GeonamesServiceImpl implements 
GeonamesService {
     }
 
     public List<GeonameEntry> reverseGeoCode(String lat, String lon) {
-        List<Condition> l = new ArrayList<Condition>();
-        Condition andCondition = new Condition();
-        
andCondition.setConditionType(definitionsService.getConditionType("booleanCondition"));
-        andCondition.setParameter("operator", "and");
-        andCondition.setParameter("subConditions", l);
-
-
-        Condition geoLocation = new Condition();
-        
geoLocation.setConditionType(definitionsService.getConditionType("geoLocationByPointSessionCondition"));
-        geoLocation.setParameter("type", "circle");
-        geoLocation.setParameter("circleLatitude", Double.parseDouble(lat));
-        geoLocation.setParameter("circleLongitude", Double.parseDouble(lon));
-        geoLocation.setParameter("distance", GEOCODING_MAX_DISTANCE);
-        l.add(geoLocation);
-
-        l.add(getPropertyCondition("featureCode", "propertyValues", 
CITIES_FEATURE_CODES, "in"));
-
-        PartialList<GeonameEntry> list = 
persistenceService.query(andCondition, "geo:location:" + lat + ":" + lon, 
GeonameEntry.class, 0, 1);
-        if (!list.getList().isEmpty()) {
-            return getHierarchy(list.getList().get(0));
-        }
-        return Collections.emptyList();
+        return contextManager.executeAsSystem(() -> {
+            List<Condition> l = new ArrayList<Condition>();
+            Condition andCondition = new Condition();
+            
andCondition.setConditionType(definitionsService.getConditionType("booleanCondition"));
+            andCondition.setParameter("operator", "and");
+            andCondition.setParameter("subConditions", l);
+
+            Condition geoLocation = new Condition();
+            
geoLocation.setConditionType(definitionsService.getConditionType("geoLocationByPointSessionCondition"));
+            geoLocation.setParameter("type", "circle");
+            geoLocation.setParameter("circleLatitude", 
Double.parseDouble(lat));
+            geoLocation.setParameter("circleLongitude", 
Double.parseDouble(lon));
+            geoLocation.setParameter("distance", GEOCODING_MAX_DISTANCE);
+            l.add(geoLocation);
+
+            l.add(getPropertyCondition("featureCode", "propertyValues", 
CITIES_FEATURE_CODES, "in"));
+
+            PartialList<GeonameEntry> list = 
persistenceService.query(andCondition, "geo:location:" + lat + ":" + lon, 
GeonameEntry.class, 0, 1);
+            if (!list.getList().isEmpty()) {
+                return getHierarchy(list.getList().get(0));
+            }
+            return Collections.emptyList();
+        });
     }
 
-
     public PartialList<GeonameEntry> getChildrenEntries(List<String> items, 
int offset, int size) {
-        Condition andCondition = getItemsInChildrenQuery(items, 
CITIES_FEATURE_CODES);
-        Condition featureCodeCondition = ((List<Condition>) 
andCondition.getParameter("subConditions")).get(0);
-        int level = items.size();
-
-        featureCodeCondition.setParameter("propertyValues", 
ORDERED_FEATURES.get(level));
-        PartialList<GeonameEntry> r = persistenceService.query(andCondition, 
null, GeonameEntry.class, offset, size);
-        while (r.size() == 0 && level < ORDERED_FEATURES.size() - 1) {
-            level++;
+        return contextManager.executeAsSystem(() -> {
+            Condition andCondition = getItemsInChildrenQuery(items, 
CITIES_FEATURE_CODES);
+            Condition featureCodeCondition = ((List<Condition>) 
andCondition.getParameter("subConditions")).get(0);
+            int level = items.size();
+
             featureCodeCondition.setParameter("propertyValues", 
ORDERED_FEATURES.get(level));
-            r = persistenceService.query(andCondition, null, 
GeonameEntry.class, offset, size);
-        }
-        return r;
+            PartialList<GeonameEntry> r = 
persistenceService.query(andCondition, null, GeonameEntry.class, offset, size);
+            while (r.size() == 0 && level < ORDERED_FEATURES.size() - 1) {
+                level++;
+                featureCodeCondition.setParameter("propertyValues", 
ORDERED_FEATURES.get(level));
+                r = persistenceService.query(andCondition, null, 
GeonameEntry.class, offset, size);
+            }
+            return r;
+        });
     }
 
     public PartialList<GeonameEntry> getChildrenCities(List<String> items, int 
offset, int size) {
-        return persistenceService.query(getItemsInChildrenQuery(items, 
CITIES_FEATURE_CODES), null, GeonameEntry.class, offset, size);
+        return contextManager.executeAsSystem(() -> 
persistenceService.query(getItemsInChildrenQuery(items, CITIES_FEATURE_CODES), 
null, GeonameEntry.class, offset, size));
     }
 
     private Condition getItemsInChildrenQuery(List<String> items, List<String> 
featureCodes) {
@@ -296,45 +358,47 @@ public class GeonamesServiceImpl implements 
GeonamesService {
     }
 
     public List<GeonameEntry> getCapitalEntries(String itemId) {
-        GeonameEntry entry = persistenceService.load(itemId, 
GeonameEntry.class);
-        List<String> featureCodes;
-
-        List<Condition> l = new ArrayList<Condition>();
-        Condition andCondition = new Condition();
-        
andCondition.setConditionType(definitionsService.getConditionType("booleanCondition"));
-        andCondition.setParameter("operator", "and");
-        andCondition.setParameter("subConditions", l);
+        return contextManager.executeAsSystem(() -> {
+            GeonameEntry entry = persistenceService.load(itemId, 
GeonameEntry.class);
+            List<String> featureCodes;
+
+            List<Condition> l = new ArrayList<Condition>();
+            Condition andCondition = new Condition();
+            
andCondition.setConditionType(definitionsService.getConditionType("booleanCondition"));
+            andCondition.setParameter("operator", "and");
+            andCondition.setParameter("subConditions", l);
+
+            l.add(getPropertyCondition("countryCode", "propertyValue", 
entry.getCountryCode(), "equals"));
+
+            if (COUNTRY_FEATURE_CODES.contains(entry.getFeatureCode())) {
+                featureCodes = Arrays.asList("PPLC");
+            } else if (ADM1_FEATURE_CODES.contains(entry.getFeatureCode())) {
+                featureCodes = Arrays.asList("PPLA", "PPLC");
+                l.add(getPropertyCondition("admin1Code", "propertyValue", 
entry.getAdmin1Code(), "equals"));
+            } else if (ADM2_FEATURE_CODES.contains(entry.getFeatureCode())) {
+                featureCodes = Arrays.asList("PPLA2", "PPLA", "PPLC");
+                l.add(getPropertyCondition("admin1Code", "propertyValue", 
entry.getAdmin1Code(), "equals"));
+                l.add(getPropertyCondition("admin2Code", "propertyValue", 
entry.getAdmin2Code(), "equals"));
+            } else {
+                return Collections.emptyList();
+            }
 
-        l.add(getPropertyCondition("countryCode", "propertyValue", 
entry.getCountryCode(), "equals"));
-
-        if (COUNTRY_FEATURE_CODES.contains(entry.getFeatureCode())) {
-            featureCodes = Arrays.asList("PPLC");
-        } else if (ADM1_FEATURE_CODES.contains(entry.getFeatureCode())) {
-            featureCodes = Arrays.asList("PPLA", "PPLC");
-            l.add(getPropertyCondition("admin1Code", "propertyValue", 
entry.getAdmin1Code(), "equals"));
-        } else if (ADM2_FEATURE_CODES.contains(entry.getFeatureCode())) {
-            featureCodes = Arrays.asList("PPLA2", "PPLA", "PPLC");
-            l.add(getPropertyCondition("admin1Code", "propertyValue", 
entry.getAdmin1Code(), "equals"));
-            l.add(getPropertyCondition("admin2Code", "propertyValue", 
entry.getAdmin2Code(), "equals"));
-        } else {
+            Condition featureCodeCondition = new Condition();
+            
featureCodeCondition.setConditionType(definitionsService.getConditionType("sessionPropertyCondition"));
+            featureCodeCondition.setParameter("propertyName", "featureCode");
+            featureCodeCondition.setParameter("propertyValues", featureCodes);
+            featureCodeCondition.setParameter("comparisonOperator", "in");
+            l.add(featureCodeCondition);
+            List<GeonameEntry> entries = 
persistenceService.query(andCondition, null, GeonameEntry.class);
+            if (entries.size() == 0) {
+                featureCodeCondition.setParameter("propertyValues", 
CITIES_FEATURE_CODES);
+                entries = persistenceService.query(andCondition, 
"population:desc", GeonameEntry.class, 0, 1).getList();
+            }
+            if (entries.size() > 0) {
+                return getHierarchy(entries.get(0));
+            }
             return Collections.emptyList();
-        }
-
-        Condition featureCodeCondition = new Condition();
-        
featureCodeCondition.setConditionType(definitionsService.getConditionType("sessionPropertyCondition"));
-        featureCodeCondition.setParameter("propertyName", "featureCode");
-        featureCodeCondition.setParameter("propertyValues", featureCodes);
-        featureCodeCondition.setParameter("comparisonOperator", "in");
-        l.add(featureCodeCondition);
-        List<GeonameEntry> entries = persistenceService.query(andCondition, 
null, GeonameEntry.class);
-        if (entries.size() == 0) {
-            featureCodeCondition.setParameter("propertyValues", 
CITIES_FEATURE_CODES);
-            entries = persistenceService.query(andCondition, 
"population:desc", GeonameEntry.class, 0, 1).getList();
-        }
-        if (entries.size() > 0) {
-            return getHierarchy(entries.get(0));
-        }
-        return Collections.emptyList();
+        });
     }
 
     private Condition getPropertyCondition(String name, String 
propertyValueField, Object value, String operator) {
diff --git 
a/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
 
b/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
index 3ad70b69b..fee5f3a89 100644
--- 
a/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
+++ 
b/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java
@@ -36,7 +36,10 @@ import org.codehaus.groovy.control.CompilerConfiguration;
 import org.codehaus.groovy.control.customizers.ImportCustomizer;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.wiring.BundleWiring;
-import org.osgi.service.component.annotations.*;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.metatype.annotations.Designate;
 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
 import org.slf4j.Logger;
@@ -46,12 +49,10 @@ import java.io.IOException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.HashSet;
-import java.util.Set;
-
 import java.util.Map;
+import java.util.Set;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -74,7 +75,6 @@ public class GroovyActionsServiceImpl implements 
GroovyActionsService {
     private BundleContext bundleContext;
     private GroovyScriptEngine groovyScriptEngine;
     private CompilerConfiguration compilerConfiguration;
-    private ScheduledFuture<?> scheduledFuture;
 
     private final Object compilationLock = new Object();
     private GroovyShell compilationShell;
@@ -88,6 +88,7 @@ public class GroovyActionsServiceImpl implements 
GroovyActionsService {
     private DefinitionsService definitionsService;
     private PersistenceService persistenceService;
     private SchedulerService schedulerService;
+    private String refreshGroovyActionsTaskId;
     private GroovyActionsServiceConfig config;
 
     @Reference
@@ -103,9 +104,12 @@ public class GroovyActionsServiceImpl implements 
GroovyActionsService {
     @Reference
     public void setSchedulerService(SchedulerService schedulerService) {
         this.schedulerService = schedulerService;
-    }
-
 
+        if (schedulerService != null) {
+            LOGGER.info("SchedulerService was set after GroovyActionsService 
initialization, initializing scheduled tasks now");
+            initializeTimers();
+        }
+    }
 
     @Activate
     public void start(GroovyActionsServiceConfig config, BundleContext 
bundleContext) {
@@ -130,15 +134,19 @@ public class GroovyActionsServiceImpl implements 
GroovyActionsService {
         // PRE-COMPILE ALL SCRIPTS AT STARTUP (no on-demand compilation)
         preloadAllScripts();
 
-        initializeTimers();
+        if (schedulerService != null) {
+            initializeTimers();
+        } else {
+            LOGGER.warn("SchedulerService not available during 
GroovyActionsService initialization. Scheduled tasks will not be registered. 
They will be registered when SchedulerService becomes available.");
+        }
         LOGGER.info("Groovy action service initialized with {} scripts", 
scriptMetadataCache.size());
     }
 
     @Deactivate
     public void onDestroy() {
         LOGGER.debug("onDestroy Method called");
-        if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
-            scheduledFuture.cancel(true);
+        if (schedulerService != null && refreshGroovyActionsTaskId != null) {
+            schedulerService.cancelTask(refreshGroovyActionsTaskId);
         }
     }
 
@@ -342,7 +350,7 @@ public class GroovyActionsServiceImpl implements 
GroovyActionsService {
 
         ScriptMetadata removedMetadata = 
scriptMetadataCache.remove(actionName);
         persistenceService.remove(actionName, GroovyAction.class);
-        
+
         // Clean up error tracking to prevent memory leak
         loggedRefreshErrors.remove(actionName);
 
@@ -445,29 +453,29 @@ public class GroovyActionsServiceImpl implements 
GroovyActionsService {
                 }
 
                 errorCount++;
-                
+
                 // Prevent log spam for repeated compilation errors during 
refresh
                 String errorMessage = e.getMessage();
                 Set<String> scriptErrors = loggedRefreshErrors.get(actionName);
-                
+
                 if (scriptErrors == null || 
!scriptErrors.contains(errorMessage)) {
                     newErrorCount++;
                     LOGGER.error("Failed to refresh script: {}", actionName, 
e);
-                    
+
                     // Prevent memory leak by limiting tracked errors before 
adding new entries
                     if (scriptErrors == null && loggedRefreshErrors.size() >= 
MAX_LOGGED_ERRORS) {
                         // Remove one random entry to make space (simple 
eviction)
                         String firstKey = 
loggedRefreshErrors.keySet().iterator().next();
                         loggedRefreshErrors.remove(firstKey);
                     }
-                    
+
                     // Now safely add the error
                     if (scriptErrors == null) {
                         scriptErrors = ConcurrentHashMap.newKeySet();
                         loggedRefreshErrors.put(actionName, scriptErrors);
                     }
                     scriptErrors.add(errorMessage);
-                    
+
                     LOGGER.warn("Keeping existing version of script {} due to 
compilation error", actionName);
                 }
 
@@ -502,7 +510,9 @@ public class GroovyActionsServiceImpl implements 
GroovyActionsService {
                 refreshGroovyActions();
             }
         };
-        scheduledFuture = 
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(task, 0, 
config.services_groovy_actions_refresh_interval(),
-                TimeUnit.MILLISECONDS);
+        if (this.refreshGroovyActionsTaskId != null) {
+            schedulerService.cancelTask(this.refreshGroovyActionsTaskId);
+        }
+        this.refreshGroovyActionsTaskId = 
schedulerService.createRecurringTask("refreshGroovyActions", 
config.services_groovy_actions_refresh_interval(), TimeUnit.MILLISECONDS, task, 
false).getItemId();
     }
 }
diff --git 
a/extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java
 
b/extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java
index da7efeac2..f7e68b783 100644
--- 
a/extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java
+++ 
b/extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java
@@ -27,7 +27,9 @@ import com.networknt.schema.*;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.unomi.api.Item;
+import org.apache.unomi.api.services.SchedulerService;
 import org.apache.unomi.api.services.ScopeService;
+import org.apache.unomi.api.tasks.ScheduledTask;
 import org.apache.unomi.persistence.spi.PersistenceService;
 import org.apache.unomi.schema.api.JsonSchemaWrapper;
 import org.apache.unomi.schema.api.SchemaService;
@@ -47,10 +49,8 @@ import java.util.stream.Collectors;
 public class SchemaServiceImpl implements SchemaService {
 
     private static final String URI = 
"https://json-schema.org/draft/2019-09/schema";;
-
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SchemaServiceImpl.class.getName());
     private static final String TARGET_EVENTS = "events";
-
     private static final String GENERIC_ERROR_KEY = "error";
 
     ObjectMapper objectMapper = new ObjectMapper();
@@ -67,18 +67,12 @@ public class SchemaServiceImpl implements SchemaService {
      * Available extensions indexed by key:schema URI to be extended, value: 
list of schema extension URIs
      */
     private ConcurrentMap<String, Set<String>> extensions = new 
ConcurrentHashMap<>();
-
     private Integer jsonSchemaRefreshInterval = 1000;
-    private ScheduledFuture<?> scheduledFuture;
-
     private PersistenceService persistenceService;
     private ScopeService scopeService;
-
     private JsonSchemaFactory jsonSchemaFactory;
-
-    // TODO UNOMI-572: when fixing UNOMI-572 please remove the usage of the 
custom ScheduledExecutorService and re-introduce the Unomi Scheduler Service
-    private ScheduledExecutorService scheduler;
-    //private SchedulerService schedulerService;
+    private SchedulerService schedulerService;
+    private String refreshJSONSchemasTaskId;
 
     @Override
     public boolean isValid(String data, String schemaId) {
@@ -378,14 +372,22 @@ public class SchemaServiceImpl implements SchemaService {
         TimerTask task = new TimerTask() {
             @Override
             public void run() {
-                try {
-                    refreshJSONSchemas();
-                } catch (Exception e) {
-                    LOGGER.error("Unexpected error while refreshing JSON 
Schemas", e);
-                }
+            try {
+                refreshJSONSchemas();
+            } catch (Exception e) {
+                LOGGER.error("Unexpected error while refreshing JSON Schemas", 
e);
+            }
             }
         };
-        scheduledFuture = scheduler.scheduleWithFixedDelay(task, 0, 
jsonSchemaRefreshInterval, TimeUnit.MILLISECONDS);
+        this.resetTimers();
+        this.refreshJSONSchemasTaskId = 
schedulerService.createRecurringTask("refreshJSONSchemas", 
jsonSchemaRefreshInterval, TimeUnit.MILLISECONDS, task, false).getItemId();
+    }
+
+    private void resetTimers() {
+        if (this.refreshJSONSchemasTaskId != null) {
+            schedulerService.cancelTask(this.refreshJSONSchemasTaskId);
+            this.refreshJSONSchemasTaskId = null;
+        }
     }
 
     private void initJsonSchemaFactory() {
@@ -414,17 +416,13 @@ public class SchemaServiceImpl implements SchemaService {
     }
 
     public void init() {
-        scheduler = Executors.newSingleThreadScheduledExecutor();
-        initJsonSchemaFactory();
-        initTimers();
+        this.initJsonSchemaFactory();
+        this.initTimers();
         LOGGER.info("Schema service initialized.");
     }
 
     public void destroy() {
-        scheduledFuture.cancel(true);
-        if (scheduler != null) {
-            scheduler.shutdown();
-        }
+        this.resetTimers();
         LOGGER.info("Schema service shutdown.");
     }
 
diff --git 
a/extensions/json-schema/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
 
b/extensions/json-schema/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 9998ad930..399f81944 100644
--- 
a/extensions/json-schema/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ 
b/extensions/json-schema/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -29,13 +29,13 @@
 
     <reference id="scopeService" 
interface="org.apache.unomi.api.services.ScopeService"/>
     <reference id="persistenceService" 
interface="org.apache.unomi.persistence.spi.PersistenceService"/>
-    <!--reference id="schedulerService" 
interface="org.apache.unomi.api.services.SchedulerService"/-->
+    <reference id="schedulerService" 
interface="org.apache.unomi.api.services.SchedulerService"/>
 
     <bean id="schemaServiceImpl" 
class="org.apache.unomi.schema.impl.SchemaServiceImpl" init-method="init"
           destroy-method="destroy">
         <property name="persistenceService" ref="persistenceService"/>
         <property name="scopeService" ref="scopeService"/>
-        <!--property name="schedulerService" ref="schedulerService"/-->
+        <property name="schedulerService" ref="schedulerService"/>
         <property name="jsonSchemaRefreshInterval" 
value="${json.schema.refresh.interval}"/>
     </bean>
     <service id="schemaService" ref="schemaServiceImpl" 
interface="org.apache.unomi.schema.api.SchemaService"/>
diff --git 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index a333490ab..e89c2e8f5 100644
--- 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++ 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -26,6 +26,8 @@ import org.apache.unomi.api.actions.Action;
 import org.apache.unomi.api.actions.ActionExecutor;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.services.*;
+import org.apache.unomi.api.tasks.ScheduledTask;
+import org.apache.unomi.api.tasks.TaskExecutor;
 import org.apache.unomi.persistence.spi.PersistenceService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,27 +156,63 @@ public class MergeProfilesOnPropertyAction implements 
ActionExecutor {
     }
 
     private void reassignPersistedBrowsingDatasAsync(boolean 
anonymousBrowsing, List<String> mergedProfileIds, String masterProfileId) {
-        schedulerService.getSharedScheduleExecutorService().schedule(new 
TimerTask() {
+        // Register task executor for data reassignment
+        String taskType = "merge-profiles-reassign-data";
+
+        // Create a reusable executor that can handle the parameters
+        TaskExecutor mergeProfilesReassignDataExecutor = new TaskExecutor() {
             @Override
-            public void run() {
-                if (!anonymousBrowsing) {
-                    Condition profileIdsCondition = new 
Condition(definitionsService.getConditionType("eventPropertyCondition"));
-                    
profileIdsCondition.setParameter("propertyName","profileId");
-                    
profileIdsCondition.setParameter("comparisonOperator","in");
-                    profileIdsCondition.setParameter("propertyValues", 
mergedProfileIds);
-
-                    String[] scripts = new String[]{"updateProfileId"};
-                    Map<String, Object>[] scriptParams = new 
Map[]{Collections.singletonMap("profileId", masterProfileId)};
-                    Condition[] conditions = new 
Condition[]{profileIdsCondition};
-
-                    persistenceService.updateWithQueryAndStoredScript(new 
Class[]{Session.class, Event.class}, scripts, scriptParams, conditions, false);
-                } else {
-                    for (String mergedProfileId : mergedProfileIds) {
-                        privacyService.anonymizeBrowsingData(mergedProfileId);
+            public String getTaskType() {
+                return taskType;
+            }
+
+            @Override
+            public void execute(ScheduledTask task, 
TaskExecutor.TaskStatusCallback callback) {
+                try {
+                    Map<String, Object> parameters = task.getParameters();
+                    boolean isAnonymousBrowsing = (boolean) 
parameters.get("anonymousBrowsing");
+                    @SuppressWarnings("unchecked")
+                    List<String> profilesIds = (List<String>) 
parameters.get("mergedProfileIds");
+                    String masterProfile = (String) 
parameters.get("masterProfileId");
+
+                    if (!anonymousBrowsing) {
+                        Condition profileIdsCondition = new 
Condition(definitionsService.getConditionType("eventPropertyCondition"));
+                        
profileIdsCondition.setParameter("propertyName","profileId");
+                        
profileIdsCondition.setParameter("comparisonOperator","in");
+                        profileIdsCondition.setParameter("propertyValues", 
mergedProfileIds);
+
+                        String[] scripts = new String[]{"updateProfileId"};
+                        Map<String, Object>[] scriptParams = new 
Map[]{Collections.singletonMap("profileId", masterProfileId)};
+                        Condition[] conditions = new 
Condition[]{profileIdsCondition};
+
+                        persistenceService.updateWithQueryAndStoredScript(new 
Class[]{Session.class, Event.class}, scripts, scriptParams, conditions, false);
+                    } else {
+                        for (String mergedProfileId : mergedProfileIds) {
+                            
privacyService.anonymizeBrowsingData(mergedProfileId);
+                        }
                     }
+
+                    callback.complete();
+                } catch (Exception e) {
+                    LOGGER.error("Error while reassigning profile data", e);
+                    callback.fail(e.getMessage());
                 }
             }
-        }, 1000, TimeUnit.MILLISECONDS);
+        };
+
+        // Register the executor
+        
schedulerService.registerTaskExecutor(mergeProfilesReassignDataExecutor);
+
+        // Create a one-shot task for async data reassignment
+        schedulerService.newTask(taskType)
+                .withParameters(Map.of(
+                        "anonymousBrowsing", anonymousBrowsing,
+                        "mergedProfileIds", mergedProfileIds,
+                        "masterProfileId", masterProfileId
+                ))
+                .withInitialDelay(1000, TimeUnit.MILLISECONDS)
+                .asOneShot()
+                .schedule();
     }
 
     private void reassignCurrentBrowsingData(Event event, List<Profile> 
existingMergedProfiles, boolean forceEventProfileAsMaster, String 
mergePropName, String mergePropValue) {
diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
index f741d8d5f..9802426e7 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
@@ -58,8 +58,8 @@ public class ClusterServiceImpl implements ClusterService {
     private volatile List<ClusterNode> cachedClusterNodes = 
Collections.emptyList();
 
     private BundleWatcher bundleWatcher;
-    private String updateSystemStatsTaskId;
-    private String cleanupStaleNodesTaskId;
+    private String clusterNodeStatisticsUpdateTaskId;
+    private String clusterStaleNodesCleanupTaskId;
 
     /**
      * Max time to wait for persistence service (in milliseconds)
@@ -206,12 +206,10 @@ public class ClusterServiceImpl implements ClusterService 
{
      * This method can be called later if schedulerService wasn't available 
during init.
      */
     public void initializeScheduledTasks() {
-        /* Wait for PR UNOMI-878 to reactivate that code
         if (schedulerService == null) {
             LOGGER.error("Cannot initialize scheduled tasks: SchedulerService 
is not set");
             return;
         }
-        */
 
         // Schedule regular updates of the node statistics
         TimerTask statisticsTask = new TimerTask() {
@@ -224,7 +222,7 @@ public class ClusterServiceImpl implements ClusterService {
                 }
             }
         };
-        updateSystemStatsTaskId = 
schedulerService.createRecurringTask("clusterNodeStatisticsUpdate", 
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS, statisticsTask, 
false).getItemId();
+        this.clusterNodeStatisticsUpdateTaskId = 
schedulerService.createRecurringTask("clusterNodeStatisticsUpdate", 
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS, statisticsTask, 
false).getItemId();
 
         // Schedule cleanup of stale nodes
         TimerTask cleanupTask = new TimerTask() {
@@ -237,7 +235,7 @@ public class ClusterServiceImpl implements ClusterService {
                 }
             }
         };
-        cleanupStaleNodesTaskId = 
schedulerService.createRecurringTask("clusterStaleNodesCleanup", 60000, 
TimeUnit.MILLISECONDS, cleanupTask, false).getItemId();
+        this.clusterStaleNodesCleanupTaskId = 
schedulerService.createRecurringTask("clusterStaleNodesCleanup", 60000, 
TimeUnit.MILLISECONDS, cleanupTask, false).getItemId();
 
         LOGGER.info("Cluster service scheduled tasks initialized");
     }
@@ -247,11 +245,13 @@ public class ClusterServiceImpl implements ClusterService 
{
         shutdownNow = true;
 
         // Cancel scheduled tasks
-        if (updateSystemStatsTaskId != null) {
-            schedulerService.cancelTask(updateSystemStatsTaskId);
+        if (schedulerService != null && clusterNodeStatisticsUpdateTaskId != 
null) {
+            schedulerService.cancelTask(clusterNodeStatisticsUpdateTaskId);
+            clusterStaleNodesCleanupTaskId = null;
         }
-        if (cleanupStaleNodesTaskId != null) {
-            schedulerService.cancelTask(cleanupStaleNodesTaskId);
+        if (schedulerService != null && clusterStaleNodesCleanupTaskId != 
null) {
+            schedulerService.cancelTask(clusterStaleNodesCleanupTaskId);
+            clusterStaleNodesCleanupTaskId = null;
         }
 
         // Remove node from persistence service
diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java
index 8fa7e1e68..ff7babc2c 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java
@@ -26,9 +26,9 @@ import org.apache.unomi.api.conditions.ConditionType;
 import org.apache.unomi.api.services.DefinitionsService;
 import org.apache.unomi.api.services.SchedulerService;
 import org.apache.unomi.api.utils.ConditionBuilder;
+import org.apache.unomi.api.utils.ParserHelper;
 import org.apache.unomi.persistence.spi.CustomObjectMapper;
 import org.apache.unomi.persistence.spi.PersistenceService;
-import org.apache.unomi.api.utils.ParserHelper;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.BundleEvent;
@@ -38,17 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
@@ -70,6 +60,8 @@ public class DefinitionsServiceImpl implements 
DefinitionsService, SynchronousBu
 
     private ConditionBuilder conditionBuilder;
     private BundleContext bundleContext;
+    private String reloadTypesTaskId;
+
     public DefinitionsServiceImpl() {
     }
 
@@ -114,10 +106,17 @@ public class DefinitionsServiceImpl implements 
DefinitionsService, SynchronousBu
                 reloadTypes(false);
             }
         };
-        
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 10000, 
definitionsRefreshInterval, TimeUnit.MILLISECONDS);
+        this.resetTypeReloads();
+        this.reloadTypesTaskId = 
schedulerService.createRecurringTask("reloadTypes", definitionsRefreshInterval, 
TimeUnit.MILLISECONDS, task, false).getItemId();
         LOGGER.info("Scheduled task for condition type loading each 10s");
     }
 
+    private void resetTypeReloads() {
+        if (this.reloadTypesTaskId != null) {
+            schedulerService.cancelTask(this.reloadTypesTaskId);
+        }
+    }
+
     public void reloadTypes(boolean refresh) {
         try {
             if (refresh) {
@@ -190,6 +189,7 @@ public class DefinitionsServiceImpl implements 
DefinitionsService, SynchronousBu
     }
 
     public void preDestroy() {
+        this.resetTypeReloads();
         bundleContext.removeBundleListener(this);
         LOGGER.info("Definitions service shutdown.");
     }
diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index 7dd5db65f..78ca73959 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -183,14 +183,15 @@ public class ProfileServiceImpl implements 
ProfileService, SynchronousBundleList
     private Integer purgeSessionExistTime = 0;
     private Integer purgeEventExistTime = 0;
     private Integer purgeProfileInterval = 0;
-    private TimerTask purgeTask = null;
     private long propertiesRefreshInterval = 10000;
 
     private PropertyTypes propertyTypes;
-    private TimerTask propertyTypeLoadTask = null;
 
     private boolean forceRefreshOnSave = false;
 
+    private String propertyTypeLoadTaskId;
+    private String purgeProfilesTaskId;
+
     public ProfileServiceImpl() {
         LOGGER.info("Initializing profile service...");
     }
@@ -241,12 +242,8 @@ public class ProfileServiceImpl implements ProfileService, 
SynchronousBundleList
     }
 
     public void preDestroy() {
-        if (purgeTask != null) {
-            purgeTask.cancel();
-        }
-        if (propertyTypeLoadTask != null) {
-            propertyTypeLoadTask.cancel();
-        }
+        this.resetProfilesPurgeTask();
+        this.resetPropertyTypeLoadTask();
         bundleContext.removeBundleListener(this);
         LOGGER.info("Profile service shutdown.");
     }
@@ -304,14 +301,21 @@ public class ProfileServiceImpl implements 
ProfileService, SynchronousBundleList
     }
 
     private void schedulePropertyTypeLoad() {
-        propertyTypeLoadTask = new TimerTask() {
+        TimerTask task = new TimerTask() {
             @Override
             public void run() {
                 reloadPropertyTypes(false);
             }
         };
-        
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(propertyTypeLoadTask,
 10000, propertiesRefreshInterval, TimeUnit.MILLISECONDS);
-        LOGGER.info("Scheduled task for property type loading each 10s");
+        this.resetPropertyTypeLoadTask();
+        this.propertyTypeLoadTaskId = 
schedulerService.createRecurringTask("propertyTypeLoad", 
propertiesRefreshInterval, TimeUnit.MILLISECONDS, task, false).getItemId();
+        LOGGER.info("Scheduled task for property type loading each {}ms", 
propertiesRefreshInterval);
+    }
+
+    private void resetPropertyTypeLoadTask() {
+        if (this.propertyTypeLoadTaskId != null) {
+            schedulerService.cancelTask(this.propertyTypeLoadTaskId);
+        }
     }
 
     public void reloadPropertyTypes(boolean refresh) {
@@ -410,7 +414,7 @@ public class ProfileServiceImpl implements ProfileService, 
SynchronousBundleList
                 LOGGER.info("Purge: Event items created since more than {} 
days, will be purged", purgeEventExistTime);
             }
 
-            purgeTask = new TimerTask() {
+            TimerTask task = new TimerTask() {
                 @Override
                 public void run() {
                     try {
@@ -429,8 +433,8 @@ public class ProfileServiceImpl implements ProfileService, 
SynchronousBundleList
                     }
                 }
             };
-
-            
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(purgeTask, 1, 
purgeProfileInterval, TimeUnit.DAYS);
+            this.resetProfilesPurgeTask();
+            this.purgeProfilesTaskId = 
schedulerService.createRecurringTask("profilesPurge", purgeProfileInterval, 
TimeUnit.DAYS, task, false).getItemId();
 
             LOGGER.info("Purge: purge scheduled with an interval of {} days", 
purgeProfileInterval);
         } else {
@@ -438,6 +442,12 @@ public class ProfileServiceImpl implements ProfileService, 
SynchronousBundleList
         }
     }
 
+    private void resetProfilesPurgeTask() {
+        if (this.purgeProfilesTaskId != null) {
+            schedulerService.cancelTask(this.purgeProfilesTaskId);
+            this.purgeProfilesTaskId = null;
+        }
+    }
 
     public long getAllProfilesCount() {
         return persistenceService.getAllItemsCount(Profile.ITEM_TYPE);
diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
index fe3fe8fb0..71cc75eb5 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
@@ -71,6 +71,9 @@ public class RulesServiceImpl implements RulesService, 
EventListenerService, Syn
     private Map<String, Set<Rule>> rulesByEventType = new HashMap<>();
     private Boolean optimizedRulesActivated = true;
 
+    private String refreshRulesTaskId;
+    private String syncRuleStatisticsTaskId;
+
     public void setBundleContext(BundleContext bundleContext) {
         this.bundleContext = bundleContext;
     }
@@ -133,11 +136,12 @@ public class RulesServiceImpl implements RulesService, 
EventListenerService, Syn
 
         bundleContext.addBundleListener(this);
 
-        initializeTimers();
+        this.initializeTimers();
         LOGGER.info("Rule service initialized.");
     }
 
     public void preDestroy() {
+        this.resetTimers();
         bundleContext.removeBundleListener(this);
         LOGGER.info("Rule service shutdown.");
     }
@@ -488,25 +492,37 @@ public class RulesServiceImpl implements RulesService, 
EventListenerService, Syn
     }
 
     private void initializeTimers() {
+        this.resetTimers();
         TimerTask task = new TimerTask() {
             @Override
             public void run() {
                 refreshRules();
             }
         };
-        
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(task, 0, 
rulesRefreshInterval, TimeUnit.MILLISECONDS);
+        this.refreshRulesTaskId = 
schedulerService.createRecurringTask("refreshRules", rulesRefreshInterval,  
TimeUnit.MILLISECONDS, task, false).getItemId();
 
         TimerTask statisticsTask = new TimerTask() {
             @Override
             public void run() {
-                try {
-                    syncRuleStatistics();
-                } catch (Throwable t) {
-                    LOGGER.error("Error synching rule statistics between 
memory and persistence back-end", t);
-                }
+            try {
+                syncRuleStatistics();
+            } catch (Throwable t) {
+                LOGGER.error("Error synching rule statistics between memory 
and persistence back-end", t);
+            }
             }
         };
-        
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(statisticsTask,
 0, rulesStatisticsRefreshInterval, TimeUnit.MILLISECONDS);
+        this.syncRuleStatisticsTaskId = 
schedulerService.createRecurringTask("syncRuleStatistics", 
rulesStatisticsRefreshInterval,  TimeUnit.MILLISECONDS, statisticsTask, 
false).getItemId();
+    }
+
+    private void resetTimers() {
+        if (refreshRulesTaskId != null) {
+            schedulerService.cancelTask(refreshRulesTaskId);
+            refreshRulesTaskId = null;
+        }
+        if (syncRuleStatisticsTaskId != null) {
+            schedulerService.cancelTask(syncRuleStatisticsTaskId);
+            syncRuleStatisticsTaskId = null;
+        }
     }
 
     public void bundleChanged(BundleEvent event) {
diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/scope/ScopeServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/scope/ScopeServiceImpl.java
index 701109d9f..179392a33 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/scope/ScopeServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/scope/ScopeServiceImpl.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -41,7 +40,7 @@ public class ScopeServiceImpl implements ScopeService {
 
     private ConcurrentMap<String, Scope> scopes = new ConcurrentHashMap<>();
 
-    private ScheduledFuture<?> scheduledFuture;
+    private String refreshScopesTaskId;
 
     public void setPersistenceService(PersistenceService persistenceService) {
         this.persistenceService = persistenceService;
@@ -56,11 +55,11 @@ public class ScopeServiceImpl implements ScopeService {
     }
 
     public void postConstruct() {
-        initializeTimers();
+        this.initializeTimers();
     }
 
     public void preDestroy() {
-        scheduledFuture.cancel(true);
+        this.resetTimers();
     }
 
     @Override
@@ -90,8 +89,15 @@ public class ScopeServiceImpl implements ScopeService {
                 refreshScopes();
             }
         };
-        scheduledFuture = schedulerService.getScheduleExecutorService()
-                .scheduleWithFixedDelay(task, 0, scopesRefreshInterval, 
TimeUnit.MILLISECONDS);
+        this.resetTimers();
+        this.refreshScopesTaskId = 
schedulerService.createRecurringTask("refreshScopes", scopesRefreshInterval, 
TimeUnit.MILLISECONDS, task, false).getItemId();
+    }
+
+    private void resetTimers() {
+        if (refreshScopesTaskId != null) {
+            schedulerService.cancelTask(refreshScopesTaskId);
+            refreshScopesTaskId = null;
+        }
     }
 
     private void refreshScopes() {
diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 1bc8730f4..453203bcd 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.unomi.api.*;
 import org.apache.unomi.api.actions.Action;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.conditions.ConditionType;
+import org.apache.unomi.api.exceptions.BadSegmentConditionException;
 import org.apache.unomi.api.query.Query;
 import org.apache.unomi.api.rules.Rule;
 import org.apache.unomi.api.segments.*;
@@ -32,12 +33,11 @@ import org.apache.unomi.api.services.RulesService;
 import org.apache.unomi.api.services.SchedulerService;
 import org.apache.unomi.api.services.SegmentService;
 import org.apache.unomi.api.utils.ConditionBuilder;
+import org.apache.unomi.api.utils.ParserHelper;
 import org.apache.unomi.persistence.spi.CustomObjectMapper;
 import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
 import org.apache.unomi.services.impl.AbstractServiceImpl;
 import org.apache.unomi.services.impl.scheduler.SchedulerServiceImpl;
-import org.apache.unomi.api.utils.ParserHelper;
-import org.apache.unomi.api.exceptions.BadSegmentConditionException;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.BundleEvent;
@@ -83,6 +83,8 @@ public class SegmentServiceImpl extends AbstractServiceImpl 
implements SegmentSe
     private int maximumIdsQueryCount = 5000;
     private boolean pastEventsDisablePartitions = false;
     private int dailyDateExprEvaluationHourUtc = 5;
+    private String recalculatePastEventConditionsTaskId;
+    private String refreshSegmentAndScoringDefinitionsTaskId;
 
     public SegmentServiceImpl() {
         LOGGER.info("Initializing segment service...");
@@ -155,11 +157,12 @@ public class SegmentServiceImpl extends 
AbstractServiceImpl implements SegmentSe
             }
         }
         bundleContext.addBundleListener(this);
-        initializeTimer();
+        this.initializeTimer();
         LOGGER.info("Segment service initialized.");
     }
 
     public void preDestroy() {
+        this.resetTimers();
         bundleContext.removeBundleListener(this);
         LOGGER.info("Segment service shutdown.");
     }
@@ -1196,7 +1199,7 @@ public class SegmentServiceImpl extends 
AbstractServiceImpl implements SegmentSe
     }
 
     private void initializeTimer() {
-
+        this.resetTimers();
         TimerTask task = new TimerTask() {
             @Override
             public void run() {
@@ -1214,7 +1217,7 @@ public class SegmentServiceImpl extends 
AbstractServiceImpl implements SegmentSe
         long period = TimeUnit.DAYS.toSeconds(taskExecutionPeriod);
         LOGGER.info("daily recalculation job for segments and scoring that 
contains date relative conditions will run at fixed rate, " +
                 "initialDelay={}, taskExecutionPeriod={} in seconds", 
initialDelay, period);
-        
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 
initialDelay, period, TimeUnit.SECONDS);
+        this.recalculatePastEventConditionsTaskId = 
schedulerService.createRecurringTask("recalculatePastEventConditions", period, 
TimeUnit.SECONDS, task, false).getItemId();
 
         task = new TimerTask() {
             @Override
@@ -1227,7 +1230,18 @@ public class SegmentServiceImpl extends 
AbstractServiceImpl implements SegmentSe
                 }
             }
         };
-        
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 0, 
segmentRefreshInterval, TimeUnit.MILLISECONDS);
+        this.refreshSegmentAndScoringDefinitionsTaskId = 
schedulerService.createRecurringTask("refreshSegmentAndScoringDefinitions", 
segmentRefreshInterval, TimeUnit.MILLISECONDS, task, false).getItemId();
+    }
+
+    private void resetTimers() {
+        if (this.recalculatePastEventConditionsTaskId != null) {
+            
schedulerService.cancelTask(this.recalculatePastEventConditionsTaskId);
+            this.recalculatePastEventConditionsTaskId = null;
+        }
+        if (this.refreshSegmentAndScoringDefinitionsTaskId != null) {
+            
schedulerService.cancelTask(this.refreshSegmentAndScoringDefinitionsTaskId);
+            this.refreshSegmentAndScoringDefinitionsTaskId = null;
+        }
     }
 
     public void setTaskExecutionPeriod(long taskExecutionPeriod) {
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml 
b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 4e23f5cef..7abef9ac4 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -367,9 +367,7 @@
         <property name="nodeId" value="${cluster.nodeId}"/>
         <property name="nodeStatisticsUpdateFrequency" 
value="${cluster.nodeStatisticsUpdateFrequency}"/>
         <property name="bundleWatcher" ref="bundleWatcher"/>
-        <!-- Wait for UNOMI-878 to be available to activate that
         <property name="schedulerService" ref="schedulerServiceImpl"/>
-        -->
     </bean>
     <service id="clusterService" ref="clusterServiceImpl" 
interface="org.apache.unomi.api.services.ClusterService"/>
 
diff --git 
a/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/CancelTaskCommand.java
 
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/CancelTaskCommand.java
new file mode 100644
index 000000000..2cff5ff5c
--- /dev/null
+++ 
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/CancelTaskCommand.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.shell.commands.scheduler;
+
+import org.apache.karaf.shell.api.action.Action;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Reference;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.unomi.api.services.SchedulerService;
+
+@Command(scope = "unomi", name = "task-cancel", description = "Cancels a 
scheduled task")
+@Service
+public class CancelTaskCommand implements Action {
+
+    @Reference
+    private SchedulerService schedulerService;
+
+    @Argument(index = 0, name = "taskId", description = "The ID of the task to 
cancel", required = true)
+    private String taskId;
+
+    @Override
+    public Object execute() throws Exception {
+        schedulerService.cancelTask(taskId);
+        System.out.println("Task " + taskId + " has been cancelled 
successfully.");
+        return null;
+    }
+}
diff --git 
a/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/ListTasksCommand.java
 
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/ListTasksCommand.java
new file mode 100644
index 000000000..c6c71a6e1
--- /dev/null
+++ 
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/ListTasksCommand.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.shell.commands.scheduler;
+
+import org.apache.karaf.shell.api.action.Action;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Reference;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.support.table.Col;
+import org.apache.karaf.shell.support.table.ShellTable;
+import org.apache.unomi.api.PartialList;
+import org.apache.unomi.api.services.SchedulerService;
+import org.apache.unomi.api.tasks.ScheduledTask;
+
+import java.text.SimpleDateFormat;
+import java.util.List;
+
+@Command(scope = "unomi", name = "task-list", description = "Lists scheduled 
tasks")
+@Service
+public class ListTasksCommand implements Action {
+
+    @Reference
+    private SchedulerService schedulerService;
+
+    @Option(name = "-s", aliases = "--status", description = "Filter by task 
status (SCHEDULED, RUNNING, COMPLETED, FAILED, CANCELLED, CRASHED)", required = 
false)
+    private String status;
+
+    @Option(name = "-t", aliases = "--type", description = "Filter by task 
type", required = false)
+    private String type;
+
+    @Option(name = "--limit", description = "Maximum number of tasks to 
display (default: 50)", required = false)
+    private int limit = 50;
+
+    @Override
+    public Object execute() throws Exception {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
+        ShellTable table = new ShellTable();
+
+        // Configure table columns
+        table.column(new Col("ID").maxSize(36));
+        table.column(new Col("Type").maxSize(30));
+        table.column(new Col("Status").maxSize(10));
+        table.column(new Col("Next Run").maxSize(19));
+        table.column(new Col("Last Run").maxSize(19));
+        table.column(new Col("Failures").alignRight());
+        table.column(new Col("Successes").alignRight());
+        table.column(new Col("Total Exec").alignRight());
+        table.column(new Col("Persistent").maxSize(10));
+
+        // Get tasks based on filters
+        List<ScheduledTask> tasks;
+        if (status != null) {
+            try {
+                ScheduledTask.TaskStatus taskStatus = 
ScheduledTask.TaskStatus.valueOf(status.toUpperCase());
+                // Get persistent tasks
+                PartialList<ScheduledTask> filteredTasks = 
schedulerService.getTasksByStatus(taskStatus, 0, limit, null);
+                tasks = filteredTasks.getList();
+                // Add memory tasks with matching status
+                List<ScheduledTask> memoryTasks = 
schedulerService.getMemoryTasks();
+                for (ScheduledTask task : memoryTasks) {
+                    if (task.getStatus() == taskStatus) {
+                        tasks.add(task);
+                    }
+                }
+            } catch (IllegalArgumentException e) {
+                System.err.println("Invalid status: " + status);
+                return null;
+            }
+        } else if (type != null) {
+            // Get persistent tasks
+            PartialList<ScheduledTask> filteredTasks = 
schedulerService.getTasksByType(type, 0, limit, null);
+            tasks = filteredTasks.getList();
+            // Add memory tasks with matching type
+            List<ScheduledTask> memoryTasks = 
schedulerService.getMemoryTasks();
+            for (ScheduledTask task : memoryTasks) {
+                if (task.getTaskType().equals(type)) {
+                    tasks.add(task);
+                }
+            }
+        } else {
+            // Get all tasks from both storage and memory
+            tasks = schedulerService.getAllTasks();
+            if (tasks.size() > limit) {
+                tasks = tasks.subList(0, limit);
+            }
+        }
+
+        // Add rows to table
+        for (ScheduledTask task : tasks) {
+            int totalExecutions = task.getSuccessCount() + 
task.getFailureCount();
+
+            table.addRow().addContent(
+                task.getItemId(),
+                task.getTaskType(),
+                task.getStatus(),
+                task.getNextScheduledExecution() != null ? 
dateFormat.format(task.getNextScheduledExecution()) : "-",
+                task.getLastExecutionDate() != null ? 
dateFormat.format(task.getLastExecutionDate()) : "-",
+                task.getFailureCount(),
+                task.getSuccessCount(),
+                totalExecutions,
+                task.isPersistent() ? "Storage" : "Memory"
+            );
+        }
+
+        table.print(System.out);
+
+        if (tasks.isEmpty()) {
+            System.out.println("No tasks found.");
+        } else {
+            int persistentCount = (int) 
tasks.stream().filter(ScheduledTask::isPersistent).count();
+            int memoryCount = tasks.size() - persistentCount;
+            System.out.println("\nShowing " + tasks.size() + " task(s) (" +
+                persistentCount + " in storage, " + memoryCount + " in 
memory)" +
+                (status != null ? " with status " + status : "") +
+                (type != null ? " of type " + type : ""));
+        }
+
+        return null;
+    }
+}
diff --git 
a/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/PurgeTasksCommand.java
 
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/PurgeTasksCommand.java
new file mode 100644
index 000000000..3cd357cc5
--- /dev/null
+++ 
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/PurgeTasksCommand.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.shell.commands.scheduler;
+
+import org.apache.karaf.shell.api.action.Action;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Reference;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.api.console.Session;
+import org.apache.unomi.api.services.SchedulerService;
+import org.apache.unomi.api.tasks.ScheduledTask;
+
+import java.util.Calendar;
+import java.util.Date;
+
+@Command(scope = "unomi", name = "task-purge", description = "Purges old 
completed tasks")
+@Service
+public class PurgeTasksCommand implements Action {
+
+    @Reference
+    private SchedulerService schedulerService;
+
+    @Reference
+    private Session session;
+
+    @Option(name = "-d", aliases = "--days", description = "Number of days to 
keep completed tasks (default: 7)", required = false)
+    private int daysToKeep = 7;
+
+    @Option(name = "-f", aliases = "--force", description = "Skip confirmation 
prompt", required = false)
+    private boolean force = false;
+
+    @Override
+    public Object execute() throws Exception {
+        if (!force) {
+            String response = session.readLine(
+                "This will permanently delete all completed tasks older than " 
+ daysToKeep + " days. Continue? (y/n): ",
+                null
+            );
+            if (!"y".equalsIgnoreCase(response != null ? response.trim() : 
"n")) {
+                System.out.println("Operation cancelled.");
+                return null;
+            }
+        }
+
+        // Calculate cutoff date
+        Calendar cal = Calendar.getInstance();
+        cal.add(Calendar.DAY_OF_MONTH, -daysToKeep);
+        Date cutoffDate = cal.getTime();
+
+        // Get completed tasks
+        int offset = 0;
+        int batchSize = 100;
+        int purgedCount = 0;
+
+        while (true) {
+            var tasks = 
schedulerService.getTasksByStatus(ScheduledTask.TaskStatus.COMPLETED, offset, 
batchSize, null);
+            if (tasks.getList().isEmpty()) {
+                break;
+            }
+
+            // Cancel old completed tasks
+            for (ScheduledTask task : tasks.getList()) {
+                if (task.getLastExecutionDate() != null && 
task.getLastExecutionDate().before(cutoffDate)) {
+                    schedulerService.cancelTask(task.getItemId());
+                    purgedCount++;
+                }
+            }
+
+            if (tasks.getList().size() < batchSize) {
+                break;
+            }
+            offset += batchSize;
+        }
+
+        System.out.println("Successfully purged " + purgedCount + " completed 
tasks older than " + daysToKeep + " days.");
+        return null;
+    }
+}
diff --git 
a/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/RetryTaskCommand.java
 
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/RetryTaskCommand.java
new file mode 100644
index 000000000..bcdf26e03
--- /dev/null
+++ 
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/RetryTaskCommand.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.shell.commands.scheduler;
+
+import org.apache.karaf.shell.api.action.Action;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Reference;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.unomi.api.services.SchedulerService;
+
+@Command(scope = "unomi", name = "task-retry", description = "Retries a failed 
task")
+@Service
+public class RetryTaskCommand implements Action {
+
+    @Reference
+    private SchedulerService schedulerService;
+
+    @Argument(index = 0, name = "taskId", description = "The ID of the task to 
retry", required = true)
+    private String taskId;
+
+    @Option(name = "-r", aliases = "--reset", description = "Reset failure 
count before retrying")
+    private boolean resetFailureCount = false;
+
+    @Override
+    public Object execute() throws Exception {
+        schedulerService.retryTask(taskId, resetFailureCount);
+        System.out.println("Task " + taskId + " has been queued for retry" +
+            (resetFailureCount ? " with reset failure count." : "."));
+        return null;
+    }
+}
diff --git 
a/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/SetExecutorNodeCommand.java
 
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/SetExecutorNodeCommand.java
new file mode 100644
index 000000000..c52e70873
--- /dev/null
+++ 
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/SetExecutorNodeCommand.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.shell.commands.scheduler;
+
+import org.apache.karaf.shell.api.action.Action;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Reference;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.unomi.api.services.SchedulerService;
+
+@Command(scope = "unomi", name = "task-executor", description = "Shows or 
changes task executor status for this node")
+@Service
+public class SetExecutorNodeCommand implements Action {
+
+    @Reference
+    private SchedulerService schedulerService;
+
+    @Argument(index = 0, name = "enable", description = "Enable (true) or 
disable (false) task execution", required = false)
+    private String enable;
+
+    @Override
+    public Object execute() throws Exception {
+        if (enable == null) {
+            // Just show current status
+            System.out.println("Task executor status: " +
+                (schedulerService.isExecutorNode() ? "ENABLED" : "DISABLED"));
+            System.out.println("Node ID: " + schedulerService.getNodeId());
+            return null;
+        }
+
+        boolean shouldEnable = Boolean.parseBoolean(enable);
+        // Note: This assumes there's a setExecutorNode method. If not 
available, we'll need to modify the service.
+        // schedulerService.setExecutorNode(shouldEnable);
+
+        System.out.println("Task executor has been " + (shouldEnable ? 
"ENABLED" : "DISABLED") +
+            " for node " + schedulerService.getNodeId());
+        return null;
+    }
+}
diff --git 
a/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/ShowTaskCommand.java
 
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/ShowTaskCommand.java
new file mode 100644
index 000000000..e59172162
--- /dev/null
+++ 
b/tools/shell-dev-commands/src/main/java/org/apache/unomi/shell/commands/scheduler/ShowTaskCommand.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.shell.commands.scheduler;
+
+import org.apache.karaf.shell.api.action.Action;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Reference;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.unomi.api.services.SchedulerService;
+import org.apache.unomi.api.tasks.ScheduledTask;
+
+import java.text.SimpleDateFormat;
+import java.util.Map;
+
+@Command(scope = "unomi", name = "task-show", description = "Shows detailed 
information about a task")
+@Service
+public class ShowTaskCommand implements Action {
+
+    @Reference
+    private SchedulerService schedulerService;
+
+    @Argument(index = 0, name = "taskId", description = "The ID of the task to 
show", required = true)
+    private String taskId;
+
+    @Override
+    public Object execute() throws Exception {
+        ScheduledTask task = schedulerService.getTask(taskId);
+        if (task == null) {
+            System.err.println("Task not found: " + taskId);
+            return null;
+        }
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
+
+        // Print basic information
+        System.out.println("Task Details");
+        System.out.println("-----------");
+        System.out.println("ID: " + task.getItemId());
+        System.out.println("Type: " + task.getTaskType());
+        System.out.println("Status: " + task.getStatus());
+        System.out.println("Persistent: " + task.isPersistent());
+        System.out.println("Parallel Execution: " + 
task.isAllowParallelExecution());
+        System.out.println("Fixed Rate: " + task.isFixedRate());
+        System.out.println("One Shot: " + task.isOneShot());
+
+        // Print timing information
+        if (task.getNextScheduledExecution() != null) {
+            System.out.println("Next Run: " + 
dateFormat.format(task.getNextScheduledExecution()));
+        }
+        if (task.getLastExecutionDate() != null) {
+            System.out.println("Last Run: " + 
dateFormat.format(task.getLastExecutionDate()));
+        }
+        System.out.println("Initial Delay: " + task.getInitialDelay() + " " + 
task.getTimeUnit());
+        System.out.println("Period: " + task.getPeriod() + " " + 
task.getTimeUnit());
+
+        // Print execution information
+        System.out.println("Failure Count: " + task.getFailureCount());
+        if (task.getLastError() != null) {
+            System.out.println("Last Error: " + task.getLastError());
+        }
+
+        // Print parameters if any
+        Map<String, Object> parameters = task.getParameters();
+        if (parameters != null && !parameters.isEmpty()) {
+            System.out.println("\nParameters");
+            System.out.println("----------");
+            for (Map.Entry<String, Object> entry : parameters.entrySet()) {
+                System.out.println(entry.getKey() + ": " + entry.getValue());
+            }
+        }
+
+        // Print checkpoint data if any
+        Map<String, Object> checkpointData = task.getCheckpointData();
+        if (checkpointData != null && !checkpointData.isEmpty()) {
+            System.out.println("\nCheckpoint Data");
+            System.out.println("--------------");
+            for (Map.Entry<String, Object> entry : checkpointData.entrySet()) {
+                System.out.println(entry.getKey() + ": " + entry.getValue());
+            }
+        }
+
+        return null;
+    }
+}

Reply via email to