Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632141&r1=1632140&r2=1632141&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Wed Oct 15 17:55:52 2014 @@ -19,7 +19,6 @@ package org.apache.sling.event.impl.jobs.queues; import java.util.ArrayList; -import java.util.Collection; import java.util.Dictionary; import java.util.HashMap; import java.util.Hashtable; @@ -30,18 +29,16 @@ import java.util.concurrent.atomic.Atomi import java.util.concurrent.atomic.AtomicInteger; import org.apache.sling.commons.threads.ThreadPool; -import org.apache.sling.commons.threads.ThreadPoolManager; import org.apache.sling.event.EventUtil; import org.apache.sling.event.impl.EventingThreadPool; import org.apache.sling.event.impl.jobs.InternalJobState; -import org.apache.sling.event.impl.jobs.JobConsumerManager; import org.apache.sling.event.impl.jobs.JobExecutionResultImpl; import org.apache.sling.event.impl.jobs.JobHandler; import org.apache.sling.event.impl.jobs.JobImpl; +import org.apache.sling.event.impl.jobs.TestLogger; import org.apache.sling.event.impl.jobs.Utility; import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration; import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier; -import org.apache.sling.event.impl.jobs.stats.StatisticsImpl; import org.apache.sling.event.impl.support.Environment; import org.apache.sling.event.impl.support.ResourceHelper; import org.apache.sling.event.jobs.Job; @@ -52,7 +49,6 @@ import org.apache.sling.event.jobs.consu import org.apache.sling.event.jobs.consumer.JobExecutionResult; import org.apache.sling.event.jobs.consumer.JobExecutor; import org.osgi.service.event.Event; -import org.osgi.service.event.EventAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,97 +57,97 @@ import org.slf4j.LoggerFactory; * functionality for the job event handling. */ public abstract class AbstractJobQueue - extends StatisticsImpl - implements JobStatusNotifier, Queue { - - /** Default number of seconds to wait for an ack. */ - private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs + implements Queue, JobStatusNotifier { /** Default timeout for suspend. */ private static final long MAX_SUSPEND_TIME = 1000 * 60 * 60; // 60 mins + /** Default number of seconds to wait for an ack. */ + private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs + /** The logger. */ protected final Logger logger; /** Configuration. */ protected final InternalQueueConfiguration configuration; - /** The event admin. */ - private final EventAdmin eventAdmin; - - /** The job consumer manager. */ - private final JobConsumerManager jobConsumerManager; - /** The queue name. */ protected volatile String queueName; /** Are we still running? */ protected volatile boolean running; - /** Is the queue currently waiting(sleeping) */ - protected volatile boolean isWaiting = false; - - /** The map of events we're have started (send). */ - private final Map<String, JobHandler> startedJobsLists = new HashMap<String, JobHandler>(); - - /** The map of events we're processing. */ - private final Map<String, JobHandler> processsingJobsLists = new HashMap<String, JobHandler>(); - /** Suspended since. */ private volatile long suspendedSince = -1L; /** Suspend lock. */ private final Object suspendLock = new Object(); + /** Services used by the queues. */ + protected final QueueServices services; + + /** The map of events we're processing. */ + private final Map<String, JobHandler> processingJobsLists = new HashMap<String, JobHandler>(); + + private final ThreadPool threadPool; + + /** The map of events we're have started (send). */ + private final Map<String, JobHandler> startedJobsLists = new HashMap<String, JobHandler>(); + /** Async counter. */ private final AtomicInteger asyncCounter = new AtomicInteger(); + /** Is the queue currently waiting(sleeping) */ + protected volatile boolean isWaiting = false; + /** Flag for outdated. */ private final AtomicBoolean isOutdated = new AtomicBoolean(false); - /** Marker flag if the queue is waiting for another element (= empty) */ - protected boolean isWaitingForNext = false; - /** A marker for closing the queue. */ private final AtomicBoolean closeMarker = new AtomicBoolean(false); - private final ThreadPool threadPool; - /** - * Start this queue + * Create a new queue * @param name The queue name * @param config The queue configuration - * @param environment The environment component */ public AbstractJobQueue(final String name, - final InternalQueueConfiguration config, - final JobConsumerManager jobConsumerManager, - final ThreadPoolManager threadPoolManager, - final EventAdmin eventAdmin) { + final InternalQueueConfiguration config, + final QueueServices services) { if ( config.getOwnThreadPoolSize() > 0 ) { - this.threadPool = new EventingThreadPool(threadPoolManager, config.getOwnThreadPoolSize()); + this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize()); } else { this.threadPool = Environment.THREAD_POOL; } this.queueName = name; this.configuration = config; - this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name); + this.services = services; + this.logger = new TestLogger(LoggerFactory.getLogger(this.getClass().getName() + '.' + name)); this.running = true; - this.eventAdmin = eventAdmin; - this.jobConsumerManager = jobConsumerManager; } /** - * @see org.apache.sling.event.jobs.Queue#getStateInfo() + * Return the queue configuration */ @Override - public String getStateInfo() { - synchronized ( this.suspendLock ) { - return "isWaiting=" + this.isWaiting + - ", suspendedSince=" + this.suspendedSince + - ", isWaitingForNext=" + this.isWaitingForNext + - ", asyncJobs=" + this.asyncCounter.get(); - } + public InternalQueueConfiguration getConfiguration() { + return this.configuration; + } + + /** + * Get the name of the job queue. + */ + @Override + public String getName() { + return this.queueName; + } + + /** + * @see org.apache.sling.event.jobs.Queue#getStatistics() + */ + @Override + public Statistics getStatistics() { + return this.services.statisticsManager.getQueueStatistics(this.queueName); } /** @@ -180,37 +176,22 @@ public abstract class AbstractJobQueue } /** - * Return the queue configuration + * Is the queue outdated? */ - @Override - public InternalQueueConfiguration getConfiguration() { - return this.configuration; + protected boolean isOutdated() { + return this.isOutdated.get(); } /** - * Close this queue. + * Outdate this queue. */ - public void close() { - this.running = false; - this.logger.debug("Shutting down job queue {}", queueName); - this.resume(); - if ( this.isWaiting ) { - this.logger.debug("Waking up waiting queue {}", this.queueName); - this.notifyFinished(null); - } - // continue queue processing to stop the queue - this.put(new JobHandler(null, null)); - - synchronized ( this.processsingJobsLists ) { - this.processsingJobsLists.clear(); - } - synchronized ( this.startedJobsLists ) { - this.startedJobsLists.clear(); - } - if ( this.configuration.getOwnThreadPoolSize() > 0 ) { - ((EventingThreadPool)this.threadPool).release(); + public void outdate() { + if ( !this.isOutdated() ) { + this.isOutdated.set(true); + final String name = this.getName() + "<outdated>(" + this.hashCode() + ")"; + this.logger.info("Outdating queue {}, renaming to {}.", this.queueName, name); + this.queueName = name; } - this.logger.info("Stopped job queue {}", this.queueName); } /** @@ -234,7 +215,34 @@ public abstract class AbstractJobQueue * Check whether this queue can be closed */ protected boolean canBeClosed() { - return this.isEmpty() && !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0 && this.isWaitingForNext; + return !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0; + } + + /** + * Close this queue. + */ + public void close() { + this.running = false; + this.logger.debug("Shutting down job queue {}", queueName); + this.resume(); + if ( this.isWaiting ) { + this.logger.debug("Waking up waiting queue {}", this.queueName); + this.notifyFinished(false); + } + // continue queue processing to stop the queue + this.services.topicManager.stop(this.getName()); + + synchronized ( this.processingJobsLists ) { + this.processingJobsLists.clear(); + } + synchronized ( this.startedJobsLists ) { + this.startedJobsLists.clear(); + } + if ( this.configuration.getOwnThreadPoolSize() > 0 ) { + ((EventingThreadPool)this.threadPool).release(); + } + + this.logger.info("Stopped job queue {}", this.queueName); } /** @@ -275,12 +283,10 @@ public abstract class AbstractJobQueue process = this.startedJobsLists.remove(info.getJob().getId()) != null; } if ( process ) { - if ( !info.reschedule() ) { - this.decQueued(); - checkForNotify(null); - } else { + if ( info.reschedule() ) { this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", Utility.toString(info.getJob()), info.getJob().getId()); - checkForNotify(info); + this.reschedule(info); + this.notifyFinished(true); } } } @@ -288,30 +294,288 @@ public abstract class AbstractJobQueue } /** - * @see org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event) + * Execute the queue */ - @Override - public boolean sendAcknowledge(final Event job) { - final String jobId = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_ID); - final JobHandler ack; - synchronized ( this.startedJobsLists ) { - ack = this.startedJobsLists.remove(jobId); + private void runJobQueue() { + while ( this.running ) { + JobHandler info = null; + if ( info == null ) { + // so let's wait/get the next job from the queue + info = this.take(); + } + + // if we're suspended we drop the current item + if ( this.running && info != null && !checkSuspended() ) { + // if we still have a job and are running, let's go + this.start(info); + } } - // if the event is still in the processing list, we confirm the ack - if ( ack != null ) { + } + + private JobHandler take() { + return this.services.topicManager.take(this.getName()); + } + + /** + * Check if the queue is suspended and go into suspend mode + */ + private boolean checkSuspended() { + boolean wasSuspended = false; + synchronized ( this.suspendLock ) { + while ( this.suspendedSince != -1 ) { + logger.debug("Sleeping as queue {} is suspended.", this.getName()); + wasSuspended = true; + final long diff = System.currentTimeMillis() - this.suspendedSince; + try { + this.suspendLock.wait(MAX_SUSPEND_TIME - diff); + } catch (final InterruptedException ignore) { + this.ignoreException(ignore); + Thread.currentThread().interrupt(); + } + logger.debug("Waking up queue {}.", this.getName()); + if ( System.currentTimeMillis() > this.suspendedSince + MAX_SUSPEND_TIME ) { + this.resume(); + } + } + } + return wasSuspended; + } + + /** + * Execute a job + */ + protected boolean executeJob(final JobHandler handler) { + final JobImpl job = handler.getJob(); + final JobExecutor consumer = this.services.jobConsumerManager.getExecutor(job.getTopic()); + + if ( (consumer != null || (job.isBridgedEvent() && this.services.jobConsumerManager.supportsBridgedEvents())) ) { + final boolean success = this.startJobExecution(handler, consumer); + return success; + } else { + // no consumer on this instance, assign to another instance + handler.reassign(); + return false; + } + } + + private boolean startJobExecution(final JobHandler handler, final JobExecutor consumer) { + final JobImpl job = handler.getJob(); + if ( handler.startProcessing(this) ) { if ( logger.isDebugEnabled() ) { - logger.debug("Received ack for job {}", Utility.toString(ack.getJob())); + logger.debug("Starting job {}", Utility.toString(job)); } - final long queueTime = ack.started - ack.queued; - this.addActive(queueTime); - Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, ack.getJob(), queueTime); - synchronized ( this.processsingJobsLists ) { - this.processsingJobsLists.put(jobId, ack); + try { + handler.started = System.currentTimeMillis(); + + if ( consumer != null ) { + final long queueTime = handler.started - handler.queued; + Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime); + synchronized ( this.processingJobsLists ) { + this.processingJobsLists.put(job.getId(), handler); + } + + final Runnable task = new Runnable() { + + /** + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + final Object lock = new Object(); + final Thread currentThread = Thread.currentThread(); + // update priority and name + final String oldName = currentThread.getName(); + final int oldPriority = currentThread.getPriority(); + + currentThread.setName(oldName + "-" + job.getQueueName() + "(" + job.getTopic() + ")"); + if ( configuration.getThreadPriority() != null ) { + switch ( configuration.getThreadPriority() ) { + case NORM : currentThread.setPriority(Thread.NORM_PRIORITY); + break; + case MIN : currentThread.setPriority(Thread.MIN_PRIORITY); + break; + case MAX : currentThread.setPriority(Thread.MAX_PRIORITY); + break; + } + } + JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED; + Job.JobState resultState = Job.JobState.ERROR; + final AtomicBoolean isAsync = new AtomicBoolean(false); + + try { + synchronized ( lock ) { + final JobExecutionContext ctx = new JobExecutionContext() { + + private boolean hasInit = false; + + @Override + public void initProgress(final int steps, + final long eta) { + if ( !hasInit ) { + handler.persistJobProperties(job.startProgress(steps, eta)); + hasInit = true; + } + } + + @Override + public void incrementProgressCount(final int steps) { + if ( hasInit ) { + handler.persistJobProperties(job.setProgress(steps)); + } + } + + @Override + public void updateProgress(final long eta) { + if ( hasInit ) { + handler.persistJobProperties(job.update(eta)); + } + } + + @Override + public void log(final String message, Object... args) { + handler.persistJobProperties(job.log(message, args)); + } + + @Override + public boolean isStopped() { + return handler.isStopped(); + } + + @Override + public void asyncProcessingFinished(final JobExecutionResult result) { + synchronized ( lock ) { + if ( isAsync.compareAndSet(true, false) ) { + services.jobConsumerManager.unregisterListener(job.getId()); + Job.JobState state = null; + if ( result.succeeded() ) { + state = Job.JobState.SUCCEEDED; + } else if ( result.failed() ) { + state = Job.JobState.QUEUED; + } else if ( result.cancelled() ) { + if ( handler.isStopped() ) { + state = Job.JobState.STOPPED; + } else { + state = Job.JobState.ERROR; + } + } + finishedJob(job.getId(), state, true); + asyncCounter.decrementAndGet(); + } else { + throw new IllegalStateException("Job is not processed async " + job.getId()); + } + } + } + + @Override + public ResultBuilder result() { + return new ResultBuilder() { + + private String message; + + private Long retryDelayInMs; + + @Override + public JobExecutionResult failed(final long retryDelayInMs) { + this.retryDelayInMs = retryDelayInMs; + return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs); + } + + @Override + public ResultBuilder message(final String message) { + this.message = message; + return this; + } + + @Override + public JobExecutionResult succeeded() { + return new JobExecutionResultImpl(InternalJobState.SUCCEEDED, message, retryDelayInMs); + } + + @Override + public JobExecutionResult failed() { + return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs); + } + + @Override + public JobExecutionResult cancelled() { + return new JobExecutionResultImpl(InternalJobState.CANCELLED, message, retryDelayInMs); + } + }; + } + }; + result = (JobExecutionResultImpl)consumer.process(job, ctx); + if ( result == null ) { // ASYNC processing + services.jobConsumerManager.registerListener(job.getId(), consumer, ctx); + asyncCounter.incrementAndGet(); + isAsync.set(true); + } else { + if ( result.succeeded() ) { + resultState = Job.JobState.SUCCEEDED; + } else if ( result.failed() ) { + resultState = Job.JobState.QUEUED; + } else if ( result.cancelled() ) { + if ( handler.isStopped() ) { + resultState = Job.JobState.STOPPED; + } else { + resultState = Job.JobState.ERROR; + } + } + } + } + } catch (final Throwable t) { //NOSONAR + logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t); + // we don't reschedule if an exception occurs + result = JobExecutionResultImpl.CANCELLED; + resultState = Job.JobState.ERROR; + } finally { + currentThread.setPriority(oldPriority); + currentThread.setName(oldName); + if ( result != null ) { + if ( result.getRetryDelayInMs() != null ) { + job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs()); + } + if ( result.getMessage() != null ) { + job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage()); + } + finishedJob(job.getId(), resultState, false); + } + } + } + + }; + // check if the thread pool is available + final ThreadPool pool = this.threadPool; + if ( pool != null ) { + pool.execute(task); + } else { + // if we don't have a thread pool, we create the thread directly + // (this should never happen for jobs, but is a safe fall back) + new Thread(task).start(); + } + + } else { + // let's add the event to our started jobs list + synchronized ( this.startedJobsLists ) { + this.startedJobsLists.put(job.getId(), handler); + } + final Event jobEvent = this.getJobEvent(handler); + // we need async delivery, otherwise we might create a deadlock + // as this method runs inside a synchronized block and the finishedJob + // method as well! + this.services.eventAdmin.postEvent(jobEvent); + } + return true; + + } catch (final Exception re) { + // if an exception occurs, we just log + this.logger.error("Exception during job processing.", re); } } else { - this.decQueued(); + if ( logger.isDebugEnabled() ) { + logger.debug("Discarding removed job {}", Utility.toString(job)); + } } - return ack != null; + return false; } private static final class RescheduleInfo { @@ -319,42 +583,41 @@ public abstract class AbstractJobQueue public long processingTime; } - private RescheduleInfo handleReschedule(final JobHandler jobEvent, final Job.JobState resultState) { + private RescheduleInfo handleReschedule(final JobHandler handler, final Job.JobState resultState) { final RescheduleInfo info = new RescheduleInfo(); switch ( resultState ) { case SUCCEEDED : // job is finished if ( this.logger.isDebugEnabled() ) { - this.logger.debug("Finished job {}", Utility.toString(jobEvent.getJob())); + this.logger.debug("Finished job {}", Utility.toString(handler.getJob())); } - info.processingTime = System.currentTimeMillis() - jobEvent.started; - this.finishedJob(info.processingTime); + info.processingTime = System.currentTimeMillis() - handler.started; + Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), info.processingTime); break; case QUEUED : // check if we exceeded the number of retries - int retries = (Integer) jobEvent.getJob().getProperty(Job.PROPERTY_JOB_RETRIES); - int retryCount = (Integer)jobEvent.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT); + final int retries = (Integer) handler.getJob().getProperty(Job.PROPERTY_JOB_RETRIES); + int retryCount = (Integer)handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT); retryCount++; if ( retries != -1 && retryCount > retries ) { if ( this.logger.isDebugEnabled() ) { - this.logger.debug("Cancelled job {}", Utility.toString(jobEvent.getJob())); + this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob())); } - this.cancelledJob(); + Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null); } else { info.reschedule = true; - // update event with retry count and retries - jobEvent.getJob().retry(); + this.reschedule(handler); if ( this.logger.isDebugEnabled() ) { - this.logger.debug("Failed job {}", Utility.toString(jobEvent.getJob())); + this.logger.debug("Failed job {}", Utility.toString(handler.getJob())); } - this.failedJob(); - jobEvent.queued = System.currentTimeMillis(); + handler.queued = System.currentTimeMillis(); + Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FAILED, handler.getJob(), null); } break; default : // consumer cancelled the job (STOPPED, GIVEN_UP, ERROR) if ( this.logger.isDebugEnabled() ) { - this.logger.debug("Cancelled job {}", Utility.toString(jobEvent.getJob())); + this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob())); } - this.cancelledJob(); + Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null); break; } @@ -379,7 +642,6 @@ public abstract class AbstractJobQueue if ( this.logger.isDebugEnabled() ) { this.logger.debug("Received finish for job {}, resultState={}", jobId, resultState); } - // let's remove the event from our processing list // this is just a sanity check, as usually the job should have been // removed during sendAcknowledge. synchronized ( this.startedJobsLists ) { @@ -388,8 +650,9 @@ public abstract class AbstractJobQueue // get job handler final JobHandler handler; - synchronized ( this.processsingJobsLists ) { - handler = this.processsingJobsLists.remove(jobId); + // let's remove the event from our processing list + synchronized ( this.processingJobsLists ) { + handler = this.processingJobsLists.remove(jobId); } if ( !this.running ) { @@ -409,338 +672,15 @@ public abstract class AbstractJobQueue if ( resultState == Job.JobState.QUEUED && !rescheduleInfo.reschedule ) { resultState = Job.JobState.GIVEN_UP; } - // if this is set after the synchronized block we have an error - final boolean finishSuccessful; if ( !rescheduleInfo.reschedule ) { // we keep cancelled jobs and succeeded jobs if the queue is configured like this. final boolean keepJobs = resultState != Job.JobState.SUCCEEDED || this.configuration.isKeepJobs(); handler.finished(resultState, keepJobs, rescheduleInfo.processingTime); - finishSuccessful = true; - if ( resultState == Job.JobState.SUCCEEDED ) { - Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), rescheduleInfo.processingTime); - } else { - Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null); - } - } else { - finishSuccessful = handler.reschedule(); - Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_FAILED, handler.getJob(), null); - } - - if ( !isAsync ) { - if ( !finishSuccessful || !rescheduleInfo.reschedule ) { - checkForNotify(null); - return false; - } - checkForNotify(handler); - } else { - // async result - if ( finishSuccessful && rescheduleInfo.reschedule ) { - final JobHandler reprocessHandler = this.reschedule(handler); - if ( reprocessHandler != null ) { - this.put(reprocessHandler); - } - } - } - return true; - } - - private void checkForNotify(final JobHandler info) { - JobHandler reprocessInfo = null; - if ( info != null ) { - reprocessInfo = this.reschedule(info); - } - notifyFinished(reprocessInfo); - } - - /** - * Get the name of the job queue. - */ - @Override - public String getName() { - return this.queueName; - } - - - /** - * Add a new job to the queue. - */ - public void process(final JobHandler handler) { - this.closeMarker.set(false); - handler.queued = System.currentTimeMillis(); - this.incQueued(); - this.put(handler); - } - - /** - * Check if the queue is suspended and go into suspend mode - */ - private void checkSuspended() { - synchronized ( this.suspendLock ) { - while ( this.suspendedSince != -1 ) { - try { - this.suspendLock.wait(MAX_SUSPEND_TIME); - } catch (final InterruptedException ignore) { - this.ignoreException(ignore); - Thread.currentThread().interrupt(); - } - if ( System.currentTimeMillis() > this.suspendedSince + MAX_SUSPEND_TIME ) { - this.resume(); - } - } } - } - - /** - * Execute the queue - */ - private void runJobQueue() { - JobHandler info = null; - while ( this.running ) { - if ( info == null ) { - // so let's wait/get the next job from the queue - info = this.take(); - } + this.notifyFinished(rescheduleInfo.reschedule); - if ( this.running ) { - checkSuspended(); - } - if ( info != null && this.running ) { - info = this.start(info); - } - } - } - - /** - * Execute a job - */ - protected boolean executeJob(final JobHandler handler) { - final JobImpl job = handler.getJob(); - final JobExecutor consumer = this.jobConsumerManager.getExecutor(job.getTopic()); - - if ( (consumer != null || (job.isBridgedEvent() && this.jobConsumerManager.supportsBridgedEvents())) ) { - if ( handler.startProcessing(this) ) { - if ( logger.isDebugEnabled() ) { - logger.debug("Starting job {}", Utility.toString(job)); - } - try { - handler.started = System.currentTimeMillis(); - - if ( consumer != null ) { - final long queueTime = handler.started - handler.queued; - this.addActive(queueTime); - Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime); - synchronized ( this.processsingJobsLists ) { - this.processsingJobsLists.put(job.getId(), handler); - } - - final Runnable task = new Runnable() { - - /** - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - final Object lock = new Object(); - final Thread currentThread = Thread.currentThread(); - // update priority and name - final String oldName = currentThread.getName(); - final int oldPriority = currentThread.getPriority(); - - currentThread.setName(oldName + "-" + job.getQueueName() + "(" + job.getTopic() + ")"); - if ( configuration.getThreadPriority() != null ) { - switch ( configuration.getThreadPriority() ) { - case NORM : currentThread.setPriority(Thread.NORM_PRIORITY); - break; - case MIN : currentThread.setPriority(Thread.MIN_PRIORITY); - break; - case MAX : currentThread.setPriority(Thread.MAX_PRIORITY); - break; - } - } - JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED; - Job.JobState resultState = Job.JobState.ERROR; - final AtomicBoolean isAsync = new AtomicBoolean(false); - - try { - synchronized ( lock ) { - final JobExecutionContext ctx = new JobExecutionContext() { - - private boolean hasInit = false; - - @Override - public void initProgress(final int steps, - final long eta) { - if ( !hasInit ) { - handler.persistJobProperties(job.startProgress(steps, eta)); - hasInit = true; - } - } - - @Override - public void incrementProgressCount(final int steps) { - if ( hasInit ) { - handler.persistJobProperties(job.setProgress(steps)); - } - } - - @Override - public void updateProgress(final long eta) { - if ( hasInit ) { - handler.persistJobProperties(job.update(eta)); - } - } - - @Override - public void log(final String message, Object... args) { - handler.persistJobProperties(job.log(message, args)); - } - - @Override - public boolean isStopped() { - return handler.isStopped(); - } - - @Override - public void asyncProcessingFinished(final JobExecutionResult result) { - synchronized ( lock ) { - if ( isAsync.compareAndSet(true, false) ) { - jobConsumerManager.unregisterListener(job.getId()); - Job.JobState state = null; - if ( result.succeeded() ) { - state = Job.JobState.SUCCEEDED; - } else if ( result.failed() ) { - state = Job.JobState.QUEUED; - } else if ( result.cancelled() ) { - if ( handler.isStopped() ) { - state = Job.JobState.STOPPED; - } else { - state = Job.JobState.ERROR; - } - } - finishedJob(job.getId(), state, true); - asyncCounter.decrementAndGet(); - } else { - throw new IllegalStateException("Job is not processed async " + job.getId()); - } - } - } - - @Override - public ResultBuilder result() { - return new ResultBuilder() { - - private String message; - - private Long retryDelayInMs; - - @Override - public JobExecutionResult failed(final long retryDelayInMs) { - this.retryDelayInMs = retryDelayInMs; - return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs); - } - - @Override - public ResultBuilder message(final String message) { - this.message = message; - return this; - } - - @Override - public JobExecutionResult succeeded() { - return new JobExecutionResultImpl(InternalJobState.SUCCEEDED, message, retryDelayInMs); - } - - @Override - public JobExecutionResult failed() { - return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs); - } - - @Override - public JobExecutionResult cancelled() { - return new JobExecutionResultImpl(InternalJobState.CANCELLED, message, retryDelayInMs); - } - }; - } - }; - result = (JobExecutionResultImpl)consumer.process(job, ctx); - if ( result == null ) { // ASYNC processing - jobConsumerManager.registerListener(job.getId(), consumer, ctx); - asyncCounter.incrementAndGet(); - notifyFinished(null); - isAsync.set(true); - } else { - if ( result.succeeded() ) { - resultState = Job.JobState.SUCCEEDED; - } else if ( result.failed() ) { - resultState = Job.JobState.QUEUED; - } else if ( result.cancelled() ) { - if ( handler.isStopped() ) { - resultState = Job.JobState.STOPPED; - } else { - resultState = Job.JobState.ERROR; - } - } - } - } - } catch (final Throwable t) { //NOSONAR - logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t); - // we don't reschedule if an exception occurs - result = JobExecutionResultImpl.CANCELLED; - resultState = Job.JobState.ERROR; - } finally { - currentThread.setPriority(oldPriority); - currentThread.setName(oldName); - if ( result != null ) { - if ( result.getRetryDelayInMs() != null ) { - job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs()); - } - if ( result.getMessage() != null ) { - job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage()); - } - finishedJob(job.getId(), resultState, false); - } - } - } - - }; - // check if the thread pool is available - final ThreadPool pool = this.threadPool; - if ( pool != null ) { - pool.execute(task); - } else { - // if we don't have a thread pool, we create the thread directly - // (this should never happen for jobs, but is a safe fallback) - new Thread(task).start(); - } - - } else { - // let's add the event to our processing list - synchronized ( this.startedJobsLists ) { - this.startedJobsLists.put(job.getId(), handler); - } - final Event jobEvent = this.getJobEvent(handler); - // we need async delivery, otherwise we might create a deadlock - // as this method runs inside a synchronized block and the finishedJob - // method as well! - this.eventAdmin.postEvent(jobEvent); - } - return true; - - } catch (final Exception re) { - // if an exception occurs, we just log - this.logger.error("Exception during job processing.", re); - } - } else { - if ( logger.isDebugEnabled() ) { - logger.debug("Discarding removed job {}", Utility.toString(job)); - } - } - } else { - handler.reassign(); - } - this.decQueued(); - return false; + return rescheduleInfo.reschedule; } /** @@ -768,40 +708,27 @@ public abstract class AbstractJobQueue } /** - * Helper method which just logs the exception in debug mode. - * @param e + * @see org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event) */ - protected void ignoreException(Exception e) { - if ( this.logger.isDebugEnabled() ) { - this.logger.debug("Ignored exception " + e.getMessage(), e); + @Override + public boolean sendAcknowledge(final Event job) { + final String jobId = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_ID); + final JobHandler ack; + synchronized ( this.startedJobsLists ) { + ack = this.startedJobsLists.remove(jobId); } - } - - /** - * Is the queue outdated? - */ - protected boolean isOutdated() { - return this.isOutdated.get(); - } - - /** - * Outdate this queue. - */ - public void outdate() { - if ( !this.isOutdated() ) { - this.isOutdated.set(true); - final String name = this.getName() + "<outdated>(" + this.hashCode() + ")"; - this.logger.info("Outdating queue {}, renaming to {}.", this.queueName, name); - this.queueName = name; + // if the event is still in the started jobs list, we confirm the ack + if ( ack != null ) { + if ( logger.isDebugEnabled() ) { + logger.debug("Received ack for job {}", Utility.toString(ack.getJob())); + } + final long queueTime = ack.started - ack.queued; + Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, ack.getJob(), queueTime); + synchronized ( this.processingJobsLists ) { + this.processingJobsLists.put(jobId, ack); + } } - } - - /** - * @see org.apache.sling.event.jobs.Queue#getStatistics() - */ - @Override - public Statistics getStatistics() { - return this; + return ack != null; } /** @@ -841,37 +768,12 @@ public abstract class AbstractJobQueue } } - /** * @see org.apache.sling.event.jobs.Queue#removeAll() */ @Override public synchronized void removeAll() { - // we suspend the queue - final boolean wasSuspended = this.isSuspended(); - this.suspend(); - // we copy all events and remove them in the background - final Collection<JobHandler> events = this.removeAllJobs(); - this.clearQueued(); - final Thread t = new Thread(new Runnable() { - - /** - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - for(final JobHandler job : events) { - job.cancel(); - Utility.sendNotification(eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, job.getJob(), null); - } - } - }, "Apache Sling Queue RemoveAll Thread for " + this.queueName); - t.setDaemon(true); - t.start(); - // start queue again - if ( !wasSuspended ) { - this.resume(); - } + this.services.topicManager.removeAll(this.getName()); } /** @@ -879,7 +781,7 @@ public abstract class AbstractJobQueue */ @Override public void clear() { - this.clearQueued(); + // this is a noop } /** @@ -891,6 +793,18 @@ public abstract class AbstractJobQueue return null; } + /** + * @see org.apache.sling.event.jobs.Queue#getStateInfo() + */ + @Override + public String getStateInfo() { + synchronized ( this.suspendLock ) { + return "isWaiting=" + this.isWaiting + + ", suspendedSince=" + this.suspendedSince + + ", asyncJobs=" + this.asyncCounter.get(); + } + } + protected long getRetryDelay(final JobHandler handler) { long delay = this.configuration.getRetryDelayInMs(); if ( handler.getJob().getProperty(JobImpl.PROPERTY_DELAY_OVERRIDE) != null ) { @@ -901,44 +815,38 @@ public abstract class AbstractJobQueue return delay; } - /** - * Reschedule a job. - */ - protected abstract JobHandler reschedule(final JobHandler info); - - /** - * Put another job into the queue. - */ - protected abstract void put(final JobHandler event); - - /** - * Get another job from the queue. - */ - protected abstract JobHandler take(); - - /** - * Is the queue empty? - */ - protected abstract boolean isEmpty(); + protected void reschedule(final JobHandler handler) { + // update event with retry count and retries + handler.reschedule(); + } /** - * Remove all events from the queue and return them. + * Helper method which just logs the exception in debug mode. + * @param e */ - protected abstract Collection<JobHandler> removeAllJobs(); - - protected abstract JobHandler start(final JobHandler event); - - protected abstract void notifyFinished(final JobHandler rescheduleInfo); + protected void ignoreException(Exception e) { + if ( this.logger.isDebugEnabled() ) { + this.logger.debug("Ignored exception " + e.getMessage(), e); + } + } public boolean stopJob(final JobImpl job) { final JobHandler handler; - synchronized ( this.processsingJobsLists ) { - handler = this.processsingJobsLists.get(job.getId()); + synchronized ( this.processingJobsLists ) { + handler = this.processingJobsLists.get(job.getId()); } if ( handler != null ) { handler.stop(); } return handler != null; } + + /** + * Start processing of a new job. + * @param handler The new job handler + */ + protected abstract void start(final JobHandler handler); + + protected abstract void notifyFinished(boolean reschedule); }
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1632141&r1=1632140&r2=1632141&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Wed Oct 15 17:55:52 2014 @@ -18,19 +18,8 @@ */ package org.apache.sling.event.impl.jobs.queues; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.sling.commons.threads.ThreadPoolManager; -import org.apache.sling.event.impl.jobs.JobConsumerManager; import org.apache.sling.event.impl.jobs.JobHandler; import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration; -import org.osgi.service.event.EventAdmin; /** * An ordered job queue is processing the queue FIFO in a serialized @@ -40,54 +29,20 @@ import org.osgi.service.event.EventAdmin */ public final class OrderedJobQueue extends AbstractJobQueue { - /** The job handler for rescheduling. */ - private volatile JobHandler jobHandler; - - /** Lock and status object for handling the sleep phase. */ - private final SleepLock sleepLock = new SleepLock(); - - /** The queue - we use a set which is sorted by job creation date. */ - private final Set<JobHandler> queue = new TreeSet<JobHandler>(new Comparator<JobHandler>() { - - @Override - public int compare(final JobHandler o1, final JobHandler o2) { - if ( o1.getJob() == null ) { - if ( o2.getJob() == null ) { - return 0; - } - return -1; - } - if ( o2.getJob() == null ) { - return 1; - } - int result = o1.getJob().getCreated().compareTo(o2.getJob().getCreated()); - if (result == 0 ) { - result = o1.getJob().getId().compareTo(o2.getJob().getId()); - } - return result; - } - }); - + /** Object to sync operations within this queue. */ private final Object syncLock = new Object(); + /** Sleep delay if job needs rescheduling. */ + private long sleepDelay = -1; + public OrderedJobQueue(final String name, final InternalQueueConfiguration config, - final JobConsumerManager jobConsumerManager, - final ThreadPoolManager threadPoolManager, - final EventAdmin eventAdmin) { - super(name, config, jobConsumerManager, threadPoolManager, eventAdmin); - } - - @Override - public String getStateInfo() { - return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince; + final QueueServices services) { + super(name, config, services); } @Override - protected JobHandler start(final JobHandler handler) { - JobHandler rescheduleHandler = null; - - // if we are ordered we simply wait for the finish + protected void start(final JobHandler handler) { synchronized ( this.syncLock ) { if ( this.executeJob(handler) ) { this.isWaiting = true; @@ -100,151 +55,45 @@ public final class OrderedJobQueue exten Thread.currentThread().interrupt(); } } - this.logger.debug("Job queue {} is continuing.", this.queueName); - rescheduleHandler = this.jobHandler; - this.jobHandler = null; - } - } - return rescheduleHandler; - } - - private void wakeUp(final boolean discardJob) { - synchronized ( this.sleepLock ) { - if ( this.sleepLock.sleepingSince != -1 ) { - if ( discardJob ) { - this.sleepLock.jobHandler = null; - } - this.sleepLock.notify(); - } - } - } - - @Override - public void resume() { - this.wakeUp(false); - super.resume(); - } - - @Override - protected void put(final JobHandler handler) { - synchronized ( this.queue ) { - this.queue.add(handler); - this.queue.notify(); - this.isWaitingForNext = false; - } - } - - @Override - protected JobHandler take() { - synchronized ( this.queue ) { - while ( this.queue.isEmpty() ) { - this.isWaitingForNext = true; - try { - this.queue.wait(); - } catch (final InterruptedException e) { - this.ignoreException(e); - Thread.currentThread().interrupt(); + if ( this.sleepDelay > 0 ) { + final long waitingTime = this.sleepDelay; + this.sleepDelay = -1; + final long startTime = System.currentTimeMillis(); + this.logger.debug("Job queue {} is sleeping {}ms for retry.", this.queueName, waitingTime); + this.isWaiting = true; + while ( this.isWaiting ) { + try { + this.syncLock.wait(waitingTime); + if ( System.currentTimeMillis() >= startTime + waitingTime ) { + this.isWaiting = false; + } + } catch (final InterruptedException e) { + this.ignoreException(e); + Thread.currentThread().interrupt(); + } + } } - this.isWaitingForNext = false; + this.logger.debug("Job queue {} is continuing.", this.queueName); } - // get the first element and remove it - final Iterator<JobHandler> i = this.queue.iterator(); - final JobHandler result = i.next(); - i.remove(); - return result; } } @Override - protected boolean isEmpty() { - synchronized ( this.queue ) { - return this.queue.isEmpty(); - } + protected void reschedule(final JobHandler handler) { + super.reschedule(handler); + this.sleepDelay = this.getRetryDelay(handler); } @Override - protected void notifyFinished(final JobHandler rescheduleHandler) { - this.jobHandler = rescheduleHandler; + protected void notifyFinished(final boolean reschedule) { this.logger.debug("Notifying job queue {} to continue processing.", this.queueName); synchronized ( this.syncLock ) { this.isWaiting = false; - this.syncLock.notify(); - } - } - - @Override - protected JobHandler reschedule(final JobHandler handler) { - // we just sleep for the delay time - if none, we continue and retry - // this job again - final long delay = this.getRetryDelay(handler); - if ( delay > 0 ) { - synchronized ( this.sleepLock ) { - this.sleepLock.sleepingSince = System.currentTimeMillis(); - this.sleepLock.jobHandler = handler; - this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay); - try { - this.sleepLock.wait(delay); - } catch (final InterruptedException e) { - this.ignoreException(e); - Thread.currentThread().interrupt(); - } - this.sleepLock.sleepingSince = -1; - final JobHandler result = this.sleepLock.jobHandler; - this.sleepLock.jobHandler = null; - - if ( result == null ) { - handler.cancel(); - } - return result; + if ( !reschedule ) { + this.sleepDelay = -1; } + this.syncLock.notify(); } - return handler; - } - - /** - * @see org.apache.sling.event.jobs.Queue#clear() - */ - @Override - public void clear() { - synchronized ( this.queue ) { - this.queue.clear(); - } - super.clear(); - } - - @Override - public synchronized void removeAll() { - // remove all remaining jobs first - super.removeAll(); - this.jobHandler = null; - this.wakeUp(true); - } - - @Override - protected Collection<JobHandler> removeAllJobs() { - final List<JobHandler> events = new ArrayList<JobHandler>(); - synchronized ( this.queue ) { - events.addAll(this.queue); - this.queue.clear(); - } - return events; - } - - @Override - public Object getState(final String key) { - if ( "isSleepingUntil".equals(key) ) { - return this.sleepLock.sleepingSince; - } - return super.getState(key); - } - - private static final class SleepLock { - - /** Marker indicating that this queue is currently sleeping. */ - public volatile long sleepingSince = -1; - - /** The job event to be returned after sleeping. */ - public volatile JobHandler jobHandler; } } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1632141&r1=1632140&r2=1632141&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Wed Oct 15 17:55:52 2014 @@ -18,81 +18,119 @@ */ package org.apache.sling.event.impl.jobs.queues; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.sling.commons.scheduler.Scheduler; -import org.apache.sling.commons.threads.ThreadPoolManager; -import org.apache.sling.event.impl.jobs.JobConsumerManager; +import java.util.Date; + import org.apache.sling.event.impl.jobs.JobHandler; import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration; -import org.osgi.service.event.EventAdmin; /** * The default parallel job queue processing the entries FIFO. * Failing jobs are rescheduled and put at the end of the queue. */ -public final class ParallelJobQueue extends AbstractParallelJobQueue { +public final class ParallelJobQueue extends AbstractJobQueue { + + private volatile int jobCount; - /** The queue. */ - private final BlockingQueue<JobHandler> queue = new LinkedBlockingQueue<JobHandler>(); + private final Object syncLock = new Object(); public ParallelJobQueue(final String name, final InternalQueueConfiguration config, - final JobConsumerManager jobConsumerManager, - final ThreadPoolManager threadPoolManager, - final EventAdmin eventAdmin, - final Scheduler scheduler) { - super(name, config, jobConsumerManager, threadPoolManager, eventAdmin, scheduler); + final QueueServices services) { + super(name, config, services); } @Override - protected void put(final JobHandler event) { - try { - this.isWaitingForNext = false; - this.queue.put(event); - } catch (final InterruptedException e) { - this.ignoreException(e); - Thread.currentThread().interrupt(); - } + public String getStateInfo() { + return super.getStateInfo() + ", jobCount=" + this.jobCount; } @Override - protected JobHandler take() { - try { - this.isWaitingForNext = true; - return this.queue.take(); - } catch (final InterruptedException e) { - this.ignoreException(e); - Thread.currentThread().interrupt(); - } finally { - this.isWaitingForNext = false; + protected void start(final JobHandler processInfo) { + // acquire a slot + this.acquireSlot(); + + // check if we got outdated in the meantime + if ( this.isOutdated() ) { + this.freeSlot(); + return; + } + if ( !this.executeJob(processInfo) ) { + this.freeSlot(); } - return null; } - @Override - protected boolean isEmpty() { - return this.queue.isEmpty(); + /** + * Acquire a processing slot. + * This method is called if the queue is not ordered. + */ + private void acquireSlot() { + synchronized ( this.syncLock ) { + if ( jobCount >= this.configuration.getMaxParallel() ) { + this.isWaiting = true; + this.logger.debug("Job queue {} is processing {} jobs - waiting for a free slot.", this.queueName, jobCount); + while ( this.isWaiting ) { + try { + this.syncLock.wait(); + } catch (final InterruptedException e) { + this.ignoreException(e); + Thread.currentThread().interrupt(); + } + } + this.logger.debug("Job queue {} is continuing.", this.queueName); + } + jobCount++; + } } /** - * @see org.apache.sling.event.jobs.Queue#clear() + * Free a slot when a job processing is finished. */ + private void freeSlot() { + synchronized ( this.syncLock ) { + jobCount--; + if ( this.isWaiting ) { + this.logger.debug("Notifying job queue {} to continue processing.", this.queueName); + this.isWaiting = false; + this.syncLock.notify(); + } + } + } + + @Override + protected boolean canBeClosed() { + boolean result = super.canBeClosed(); + if ( result ) { + result = this.jobCount == 0; + } + return result; + } + @Override - public void clear() { - this.queue.clear(); - super.clear(); + protected void notifyFinished(final boolean reschedule) { + this.freeSlot(); } @Override - protected Collection<JobHandler> removeAllJobs() { - final List<JobHandler> events = new ArrayList<JobHandler>(); - this.queue.drainTo(events); - return events; + protected void reschedule(final JobHandler handler) { + // we just sleep for the delay time - if none, we continue and retry + // this job again + final long delay = this.getRetryDelay(handler); + if ( delay > 0 ) { + final Date fireDate = new Date(); + fireDate.setTime(System.currentTimeMillis() + delay); + + final String jobName = "Waiting:" + queueName + ":" + handler.hashCode(); + final Runnable t = new Runnable() { + @Override + public void run() { + ParallelJobQueue.super.reschedule(handler); + } + }; + services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName)); + } else { + // put directly into queue + super.reschedule(handler); + } } } Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1632141&view=auto ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (added) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Wed Oct 15 17:55:52 2014 @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl.jobs.queues; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Properties; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.Service; +import org.apache.sling.commons.scheduler.Scheduler; +import org.apache.sling.commons.threads.ThreadPoolManager; +import org.apache.sling.event.impl.jobs.JobConsumerManager; +import org.apache.sling.event.impl.jobs.JobManagerConfiguration; +import org.apache.sling.event.impl.jobs.TestLogger; +import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration; +import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager; +import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo; +import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent; +import org.apache.sling.event.impl.jobs.jmx.QueuesMBeanImpl; +import org.apache.sling.event.impl.jobs.stats.StatisticsManager; +import org.apache.sling.event.impl.jobs.topics.TopicManager; +import org.apache.sling.event.impl.support.Environment; +import org.apache.sling.event.impl.support.ResourceHelper; +import org.apache.sling.event.jobs.Queue; +import org.apache.sling.event.jobs.QueueConfiguration; +import org.apache.sling.event.jobs.jmx.QueuesMBean; +import org.osgi.service.event.EventAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Implementation of the job manager. + */ +@Component(immediate=true) +@Service(value={Runnable.class, QueueManager.class}) +@Properties({ + @Property(name="scheduler.period", longValue=60, propertyPrivate=true), + @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true) +}) +public class QueueManager + implements Runnable { + + /** Default logger. */ + private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass())); + + @Reference + private EventAdmin eventAdmin; + + @Reference + private Scheduler scheduler; + + @Reference + private JobConsumerManager jobConsumerManager; + + @Reference + private QueuesMBean queuesMBean; + + @Reference + private ThreadPoolManager threadPoolManager; + + /** The job manager configuration. */ + @Reference + private JobManagerConfiguration configuration; + + @Reference + private StatisticsManager statisticsManager; + + @Reference + private QueueConfigurationManager queueManager; + + /** Lock object for the queues map - we don't want to sync directly on the concurrent map. */ + private final Object queuesLock = new Object(); + + /** All active queues. */ + private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>(); + + /** We count the scheduler runs. */ + private volatile long schedulerRuns; + + /** + * Activate this component. + * @param props Configuration properties + */ + @Activate + protected void activate(final Map<String, Object> props) { + logger.info("Apache Sling Queue Manager started on instance {}", Environment.APPLICATION_ID); + } + + /** + * Deactivate this component. + */ + @Deactivate + protected void deactivate() { + logger.debug("Apache Sling Queue Manager stopping on instance {}", Environment.APPLICATION_ID); + + final Iterator<AbstractJobQueue> i = this.queues.values().iterator(); + while ( i.hasNext() ) { + final AbstractJobQueue jbq = i.next(); + jbq.close(); + // update mbeans + ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq)); + } + this.queues.clear(); + logger.info("Apache Sling Queue Manager stopped on instance {}", Environment.APPLICATION_ID); + } + + /** + * This method is invoked periodically by the scheduler. + * It searches for idle queues and stops them after a timeout. If a queue + * is idle for two consecutive clean up calls, it is removed. + * @see java.lang.Runnable#run() + */ + private void maintain() { + this.schedulerRuns++; + logger.debug("Job manager maintenance: Starting #{}", this.schedulerRuns); + + // check for unprocessed jobs first + logger.debug("Checking for unprocessed jobs..."); + for(final AbstractJobQueue jbq : this.queues.values() ) { + jbq.checkForUnprocessedJobs(); + } + + // we only do a full clean up on every fifth run + final boolean doFullCleanUp = (schedulerRuns % 5 == 0); + + if ( doFullCleanUp ) { + // check for idle queue + logger.debug("Checking for idle queues..."); + + // we synchronize to avoid creating a queue which is about to be removed during cleanup + synchronized ( queuesLock ) { + final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator(); + while ( i.hasNext() ) { + final Map.Entry<String, AbstractJobQueue> current = i.next(); + final AbstractJobQueue jbq = current.getValue(); + if ( jbq.tryToClose() ) { + logger.debug("Removing idle job queue {}", jbq); + // remove + i.remove(); + // update mbeans + ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq)); + } + } + } + } + logger.debug("Job manager maintenance: Finished #{}", this.schedulerRuns); + } + + /** + * Start a new queue + * This method first searches the corresponding queue - if such a queue + * does not exist yet, it is created and started. + * + * @param topic The topic + */ + public void start(final TopicManager topicManager, final QueueInfo queueInfo) { + final InternalQueueConfiguration config = queueInfo.queueConfiguration; + // get or create queue + AbstractJobQueue queue = null; + // we synchronize to avoid creating a queue which is about to be removed during cleanup + synchronized ( queuesLock ) { + queue = this.queues.get(queueInfo.queueName); + // check for reconfiguration, we really do an identity check here(!) + if ( queue != null && queue.getConfiguration() != config ) { + this.outdateQueue(queue); + // we use a new queue with the configuration + queue = null; + } + if ( queue == null ) { + final QueueServices services = new QueueServices(); + services.eventAdmin = this.eventAdmin; + services.jobConsumerManager = this.jobConsumerManager; + services.scheduler = this.scheduler; + services.threadPoolManager = this.threadPoolManager; + services.topicManager = topicManager; + services.statisticsManager = statisticsManager; + if ( config.getType() == QueueConfiguration.Type.ORDERED ) { + queue = new OrderedJobQueue(queueInfo.queueName, config, services); + } else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) { + queue = new ParallelJobQueue(queueInfo.queueName, config, services); + } else if ( config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) { + queue = new ParallelJobQueue(queueInfo.queueName, config, services); + } + // this is just a sanity check, actually we always have a queue instance here + if ( queue != null ) { + queues.put(queueInfo.queueName, queue); + ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null)); + queue.start(); + } + } + } + } + + /** + * This method is invoked periodically by the scheduler. + * In the default configuration every minute + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + this.maintain(); + } + + private void outdateQueue(final AbstractJobQueue queue) { + // remove the queue with the old name + // check for main queue + final String oldName = ResourceHelper.filterQueueName(queue.getName()); + this.queues.remove(oldName); + // check if we can close or have to rename + if ( queue.tryToClose() ) { + // copy statistics + // update mbeans + ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, queue)); + } else { + queue.outdate(); + // readd with new name + String newName = ResourceHelper.filterName(queue.getName()); + int index = 0; + while ( this.queues.containsKey(newName) ) { + newName = ResourceHelper.filterName(queue.getName()) + '$' + String.valueOf(index++); + } + this.queues.put(newName, queue); + // update mbeans + ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, queue)); + } + } + + /** + * @see org.apache.sling.event.jobs.JobManager#restart() + */ + public void restart() { + // let's rename/close all queues and clear them + synchronized ( queuesLock ) { + final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values()); + for(final AbstractJobQueue queue : queues ) { + queue.clear(); + this.outdateQueue(queue); + } + } + } + + private void stopProcessing() { + // let's rename/close all queues and clear them + synchronized ( queuesLock ) { + final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values()); + for(final AbstractJobQueue queue : queues ) { + queue.clear(); + this.outdateQueue(queue); + } + } + } + + /** + * @see org.apache.sling.event.jobs.JobManager#getQueue(java.lang.String) + */ + public Queue getQueue(final String name) { + return this.queues.get(name); + } + + /** + * @see org.apache.sling.event.jobs.JobManager#getQueues() + */ + public Iterable<Queue> getQueues() { + final Iterator<AbstractJobQueue> jqI = this.queues.values().iterator(); + return new Iterable<Queue>() { + + @Override + public Iterator<Queue> iterator() { + return new Iterator<Queue>() { + + @Override + public boolean hasNext() { + return jqI.hasNext(); + } + + @Override + public Queue next() { + return jqI.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } +} Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java?rev=1632141&view=auto ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (added) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java Wed Oct 15 17:55:52 2014 @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl.jobs.queues; + +import org.apache.sling.commons.scheduler.Scheduler; +import org.apache.sling.commons.threads.ThreadPoolManager; +import org.apache.sling.event.impl.jobs.JobConsumerManager; +import org.apache.sling.event.impl.jobs.stats.StatisticsManager; +import org.apache.sling.event.impl.jobs.topics.TopicManager; +import org.osgi.service.event.EventAdmin; + +public class QueueServices { + + public JobConsumerManager jobConsumerManager; + + public EventAdmin eventAdmin; + + public ThreadPoolManager threadPoolManager; + + public Scheduler scheduler; + + public TopicManager topicManager; + + public StatisticsManager statisticsManager; +} Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java ------------------------------------------------------------------------------ svn:mime-type = text/plain