This is an automated email from the ASF dual-hosted git repository. joerghoh pushed a commit to branch SLING-13044 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event.git
commit f1cf972480835d73222af664eaf7ef0799929617 Author: Joerg Hoh <[email protected]> AuthorDate: Thu Jan 8 16:43:31 2026 +0100 SLING-13044 reschedule jobs via dedicated threadpool instead of timers --- .../sling/event/impl/jobs/queues/JobQueueImpl.java | 23 ++---- .../impl/jobs/queues/JobReschedulingManager.java | 88 ++++++++++++++++++++++ .../sling/event/impl/jobs/queues/QueueManager.java | 4 + .../event/impl/jobs/queues/QueueServices.java | 2 + .../event/impl/jobs/queues/JobQueueImplTest.java | 8 ++ 5 files changed, 110 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java index 4447da1..d085719 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java @@ -23,8 +23,6 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -85,6 +83,8 @@ public class JobQueueImpl implements Queue { private final ThreadPool threadPool; + private final JobReschedulingManager jobReschedulingManager; + /** Async counter. */ private final AtomicInteger asyncCounter = new AtomicInteger(); @@ -175,17 +175,19 @@ public class JobQueueImpl implements Queue { final QueueServices services, final QueueJobCache cache, final OutdatedJobQueueInfo outdatedQueue) { + this.queueName = name; + this.configuration = config; + if (config.getOwnThreadPoolSize() > 0) { this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize()); } else { this.threadPool = services.eventingThreadPool; } - this.queueName = name; - this.configuration = config; this.services = services; this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name); this.running = true; this.cache = cache; + this.jobReschedulingManager = services.reschedulingManager; this.maxParallel = config.getMaxParallel(); if (outdatedQueue == null) { // queue is created the first time @@ -802,7 +804,7 @@ public class JobQueueImpl implements Queue { this.isSleepingUntil = fireDate.getTime(); } - final Runnable t = new Runnable() { + final Runnable task = new Runnable() { @Override public void run() { try { @@ -820,16 +822,7 @@ public class JobQueueImpl implements Queue { } }; this.waitCounter.incrementAndGet(); - final Timer timer = new Timer(); - timer.schedule( - new TimerTask() { - - @Override - public void run() { - t.run(); - } - }, - delay); + this.jobReschedulingManager.reschedule(task, delay); } else { // put directly into queue this.requeue(handler); diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobReschedulingManager.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobReschedulingManager.java new file mode 100644 index 0000000..c7f184e --- /dev/null +++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobReschedulingManager.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl.jobs.queues; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.metatype.annotations.AttributeDefinition; +import org.osgi.service.metatype.annotations.Designate; +import org.osgi.service.metatype.annotations.ObjectClassDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A centralized service to manage the delay after which tasks are executed. + * + * This service is used to reschedule failed jobs, which are submitted to the queue again + * after the delay has passed. For that reason the threadpool to manage that delay is quite small, + * as the re-submission of the jobs into their respective queues is fast. + */ +@Component(service = JobReschedulingManager.class) +@Designate(ocd = JobReschedulingManager.Config.class) +public class JobReschedulingManager { + + @ObjectClassDefinition(name = "Apache Sling Job Rescheduling Manager") + public @interface Config { + + @AttributeDefinition(name = "thread pool size", description = "number of threads to execute the rescheduling") + public int threadCount() default 1; + } + + private static final Logger logger = LoggerFactory.getLogger(JobReschedulingManager.class); + + private ScheduledThreadPoolExecutor executor; + + @Activate + public void activate(Config config) { + executor = new ScheduledThreadPoolExecutor(config.threadCount(), new ThreadFactory() { + private final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(final Runnable r) { + final Thread t = new Thread(r, "sling-job-rescheduler-" + threadNumber.getAndIncrement()); + t.setDaemon(true); + t.setUncaughtExceptionHandler((Thread thread, Throwable e) -> { + logger.error("Thread '" + thread.getName() + "' terminated unexpectedly", e); + }); + return t; + } + }); + } + + @Deactivate + public void deactivate() { + // submitted tasks which are not yet executed will be dropped (which is fine here) + executor.shutdown(); + } + + /** + * Start the provided task with a delay in a fire & forgot way + * @param task the task to execute + * @param delayInMilis the delay in milliseconds + */ + public void reschedule(Runnable task, long delayInMilis) { + executor.schedule(task, delayInMilis, TimeUnit.MILLISECONDS); + } +} diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java index 127def4..80c213f 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java @@ -115,6 +115,9 @@ public class QueueManager implements Runnable, EventHandler, ConfigurationChange @Reference(policyOption = ReferencePolicyOption.GREEDY) private ThreadPoolManager threadPoolManager; + @Reference(policyOption = ReferencePolicyOption.GREEDY) + private JobReschedulingManager jobReschedulingManager; + /** * Our thread pool. */ @@ -164,6 +167,7 @@ public class QueueManager implements Runnable, EventHandler, ConfigurationChange queueServices.threadPoolManager = this.threadPoolManager; queueServices.statisticsManager = statisticsManager; queueServices.eventingThreadPool = this.threadPool; + queueServices.reschedulingManager = this.jobReschedulingManager; this.configuration.addListener(this); logger.info("Apache Sling Queue Manager started on instance {}", Environment.APPLICATION_ID); } diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java index 31beab2..4250b49 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java @@ -43,4 +43,6 @@ public class QueueServices { public StatisticsManager statisticsManager; public ThreadPool eventingThreadPool; + + public JobReschedulingManager reschedulingManager; } diff --git a/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java b/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java index 132118a..d9c83f0 100644 --- a/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java +++ b/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java @@ -27,6 +27,7 @@ import org.apache.sling.event.impl.jobs.JobHandler; import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration; import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration; import org.apache.sling.event.impl.jobs.stats.StatisticsManager; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.InjectMocks; @@ -71,6 +72,13 @@ public class JobQueueImplTest { jobQueue = new JobQueueImpl(testQueue, internalConfig, services, cache, null); } + @After + public void tearDown() { + if (jobQueue != null) { + jobQueue.close(); + } + } + @Test public void testStartJobsWhenDisabled() { // Add a job handler to the queue
