Author: cziegeler Date: Mon Sep 16 16:35:49 2013 New Revision: 1523723 URL: http://svn.apache.org/r1523723 Log: SLING-3028 : Support for progress tracking of jobs
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1523723&r1=1523722&r2=1523723&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java Mon Sep 16 16:35:49 2013 @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Hashtable; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; @@ -35,12 +36,18 @@ import org.apache.felix.scr.annotations. import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; +import org.apache.felix.scr.annotations.References; import org.apache.felix.scr.annotations.Service; import org.apache.sling.commons.osgi.PropertiesUtil; import org.apache.sling.discovery.PropertyProvider; import org.apache.sling.event.impl.support.TopicMatcher; import org.apache.sling.event.impl.support.TopicMatcherHelper; +import org.apache.sling.event.jobs.Job; import org.apache.sling.event.jobs.consumer.JobConsumer; +import org.apache.sling.event.jobs.consumer.JobConsumer.JobResult; +import org.apache.sling.event.jobs.consumer.JobExecutionContext; +import org.apache.sling.event.jobs.consumer.JobExecutor; +import org.apache.sling.event.jobs.consumer.JobStatus; import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; import org.osgi.framework.ServiceReference; @@ -54,9 +61,14 @@ import org.slf4j.LoggerFactory; description="%job.consumermanager.description", metatype=true) @Service(value=JobConsumerManager.class) -@Reference(referenceInterface=JobConsumer.class, - cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE, - policy=ReferencePolicy.DYNAMIC) +@References({ + @Reference(referenceInterface=JobConsumer.class, + cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE, + policy=ReferencePolicy.DYNAMIC), + @Reference(referenceInterface=JobExecutor.class, + cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE, + policy=ReferencePolicy.DYNAMIC) +}) @Property(name="org.apache.sling.installer.configuration.persist", boolValue=false, propertyPrivate=true) public class JobConsumerManager { @@ -154,22 +166,22 @@ public class JobConsumerManager { } /** - * Get the consumer for the topic. + * Get the executor for the topic. * @param topic The job topic * @return A consumer or <code>null</code> */ - public JobConsumer getConsumer(final String topic) { + public JobExecutor getExecutor(final String topic) { synchronized ( this.topicToConsumerMap ) { final List<ConsumerInfo> consumers = this.topicToConsumerMap.get(topic); if ( consumers != null ) { - return consumers.get(0).getConsumer(this.bundleContext); + return consumers.get(0).getExecutor(this.bundleContext); } final int pos = topic.lastIndexOf('/'); if ( pos > 0 ) { final String category = topic.substring(0, pos + 1).concat("*"); final List<ConsumerInfo> categoryConsumers = this.topicToConsumerMap.get(category); if ( categoryConsumers != null ) { - return categoryConsumers.get(0).getConsumer(this.bundleContext); + return categoryConsumers.get(0).getExecutor(this.bundleContext); } } } @@ -195,9 +207,42 @@ public class JobConsumerManager { * @param serviceReference The service reference to the consumer. */ protected void bindJobConsumer(final ServiceReference serviceReference) { + this.bindService(serviceReference, true); + } + + /** + * Unbind a consumer + * @param serviceReference The service reference to the consumer. + */ + protected void unbindJobConsumer(final ServiceReference serviceReference) { + this.unbindService(serviceReference, true); + } + + /** + * Bind a new executor + * @param serviceReference The service reference to the executor. + */ + protected void bindJobExecutor(final ServiceReference serviceReference) { + this.bindService(serviceReference, false); + } + + /** + * Unbind a executor + * @param serviceReference The service reference to the executor. + */ + protected void unbindJobExecutor(final ServiceReference serviceReference) { + this.unbindService(serviceReference, false); + } + + /** + * Bind a consumer or executor + * @param serviceReference The service reference to the consumer or executor. + * @param isConsumer Indicating whether this is a JobConsumer or JobExecutor + */ + private void bindService(final ServiceReference serviceReference, final boolean isConsumer) { final String[] topics = PropertiesUtil.toStringArray(serviceReference.getProperty(JobConsumer.PROPERTY_TOPICS)); if ( topics != null && topics.length > 0 ) { - final ConsumerInfo info = new ConsumerInfo(serviceReference); + final ConsumerInfo info = new ConsumerInfo(serviceReference, isConsumer); boolean changed = false; synchronized ( this.topicToConsumerMap ) { for(final String t : topics) { @@ -228,13 +273,14 @@ public class JobConsumerManager { } /** - * Unbind a consumer - * @param serviceReference The service reference to the consumer. + * Unbind a consumer or executor + * @param serviceReference The service reference to the consumer or executor. + * @param isConsumer Indicating whether this is a JobConsumer or JobExecutor */ - protected void unbindJobConsumer(final ServiceReference serviceReference) { + private void unbindService(final ServiceReference serviceReference, final boolean isConsumer) { final String[] topics = PropertiesUtil.toStringArray(serviceReference.getProperty(JobConsumer.PROPERTY_TOPICS)); if ( topics != null && topics.length > 0 ) { - final ConsumerInfo info = new ConsumerInfo(serviceReference); + final ConsumerInfo info = new ConsumerInfo(serviceReference, isConsumer); boolean changed = false; synchronized ( this.topicToConsumerMap ) { for(final String t : topics) { @@ -311,12 +357,14 @@ public class JobConsumerManager { private final static class ConsumerInfo implements Comparable<ConsumerInfo> { public final ServiceReference serviceReference; - private JobConsumer consumer; + private final boolean isConsumer; + private JobExecutor executor; public final int ranking; public final long serviceId; - public ConsumerInfo(final ServiceReference serviceReference) { + public ConsumerInfo(final ServiceReference serviceReference, final boolean isConsumer) { this.serviceReference = serviceReference; + this.isConsumer = isConsumer; final Object sr = serviceReference.getProperty(Constants.SERVICE_RANKING); if ( sr == null || !(sr instanceof Integer)) { this.ranking = 0; @@ -350,11 +398,70 @@ public class JobConsumerManager { return serviceReference.hashCode(); } - public JobConsumer getConsumer(final BundleContext bundleContext) { - if ( consumer == null ) { - consumer = (JobConsumer) bundleContext.getService(this.serviceReference); + public JobExecutor getExecutor(final BundleContext bundleContext) { + if ( executor == null ) { + if ( this.isConsumer ) { + executor = new JobConsumerWrapper((JobConsumer) bundleContext.getService(this.serviceReference)); + } else { + executor = (JobExecutor) bundleContext.getService(this.serviceReference); + } + } + return executor; + } + } + + private final static class JobConsumerWrapper implements JobExecutor { + + private final JobConsumer consumer; + + public JobConsumerWrapper(final JobConsumer consumer) { + this.consumer = consumer; + } + + @Override + public JobStatus process(final Job job, final JobExecutionContext context) { + final JobConsumer.AsyncHandler asyncHandler = + new JobConsumer.AsyncHandler() { + + final Object asyncLock = new Object(); + final AtomicBoolean asyncDone = new AtomicBoolean(false); + + private void check(final JobStatus result) { + synchronized ( asyncLock ) { + if ( !asyncDone.get() ) { + asyncDone.set(true); + context.asyncProcessingFinished(result); + } else { + throw new IllegalStateException("Job is already marked as processed"); + } + } + } + + @Override + public void ok() { + this.check(JobStatus.OK); + } + + @Override + public void failed() { + this.check(JobStatus.FAILED); + } + + @Override + public void cancel() { + this.check(JobStatus.CANCEL); + } + }; + ((JobImpl)job).setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, asyncHandler); + final JobConsumer.JobResult result = this.consumer.process(job); + if ( result == JobResult.ASYNC ) { + return JobStatus.ASYNC; + } else if ( result == JobResult.FAILED) { + return JobStatus.FAILED; + } else if ( result == JobResult.OK) { + return JobStatus.OK; } - return consumer; + return JobStatus.CANCEL; } } } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1523723&r1=1523722&r2=1523723&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Mon Sep 16 16:35:49 2013 @@ -77,7 +77,7 @@ import org.apache.sling.event.jobs.Queue import org.apache.sling.event.jobs.QueueConfiguration; import org.apache.sling.event.jobs.Statistics; import org.apache.sling.event.jobs.TopicStatistics; -import org.apache.sling.event.jobs.consumer.JobConsumer; +import org.apache.sling.event.jobs.consumer.JobExecutor; import org.apache.sling.event.jobs.jmx.QueuesMBean; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; @@ -279,7 +279,7 @@ public class JobManagerImpl */ void process(final JobImpl job) { // check if we still are able to process this job - final JobConsumer consumer = this.jobConsumerManager.getConsumer(job.getTopic()); + final JobExecutor consumer = this.jobConsumerManager.getExecutor(job.getTopic()); boolean reassign = false; String reassignTargetId = null; if ( consumer == null && (!job.isBridgedEvent() || !this.jobConsumerManager.supportsBridgedEvents())) { Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1523723&r1=1523722&r2=1523723&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Mon Sep 16 16:35:49 2013 @@ -43,8 +43,9 @@ import org.apache.sling.event.jobs.Job; import org.apache.sling.event.jobs.JobUtil; import org.apache.sling.event.jobs.Queue; import org.apache.sling.event.jobs.Statistics; -import org.apache.sling.event.jobs.consumer.JobConsumer; -import org.apache.sling.event.jobs.consumer.JobConsumer.JobResult; +import org.apache.sling.event.jobs.consumer.JobExecutionContext; +import org.apache.sling.event.jobs.consumer.JobExecutor; +import org.apache.sling.event.jobs.consumer.JobStatus; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; import org.slf4j.Logger; @@ -296,9 +297,9 @@ public abstract class AbstractJobQueue return ack != null; } - private boolean handleReschedule(final JobHandler jobEvent, final JobConsumer.JobResult result) { + private boolean handleReschedule(final JobHandler jobEvent, final JobStatus result) { boolean reschedule = false; - switch ( result ) { + switch ( result.getState() ) { case OK : // job is finished if ( this.logger.isDebugEnabled() ) { this.logger.debug("Finished job {}", Utility.toString(jobEvent.getJob())); @@ -351,11 +352,11 @@ public abstract class AbstractJobQueue @Override public boolean finishedJob(final Event job, final boolean shouldReschedule) { final String location = (String)job.getProperty(JobUtil.JOB_ID); - return this.finishedJob(location, shouldReschedule ? JobResult.FAILED : JobResult.OK, false); + return this.finishedJob(location, shouldReschedule ? JobStatus.FAILED : JobStatus.OK, false); } private boolean finishedJob(final String jobId, - final JobConsumer.JobResult result, + final JobStatus result, final boolean isAsync) { if ( this.logger.isDebugEnabled() ) { this.logger.debug("Received finish for job {}, result={}", jobId, result); @@ -487,7 +488,7 @@ public abstract class AbstractJobQueue */ protected boolean executeJob(final JobHandler handler) { final JobImpl job = handler.getJob(); - final JobConsumer consumer = this.jobConsumerManager.getConsumer(job.getTopic()); + final JobExecutor consumer = this.jobConsumerManager.getExecutor(job.getTopic()); if ( (consumer != null || (job.isBridgedEvent() && this.jobConsumerManager.supportsBridgedEvents())) ) { if ( handler.start() ) { @@ -528,58 +529,72 @@ public abstract class AbstractJobQueue break; } } - JobConsumer.JobResult result = JobConsumer.JobResult.CANCEL; - final JobConsumer.AsyncHandler asyncHandler = - new JobConsumer.AsyncHandler() { - - final Object asyncLock = new Object(); - final AtomicBoolean asyncDone = new AtomicBoolean(false); - - private void check(final JobConsumer.JobResult result) { - synchronized ( asyncLock ) { - if ( !asyncDone.get() ) { - asyncDone.set(true); - finishedJob(job.getId(), result, true); - asyncCounter.decrementAndGet(); - } else { - throw new IllegalStateException("Job is already marked as processed"); - } - } + JobStatus result = JobStatus.CANCEL; + final AtomicBoolean isAsync = new AtomicBoolean(false); + + try { + synchronized ( job ) { + result = consumer.process(job, new JobExecutionContext() { + + @Override + public void update(long eta) { + // TODO Auto-generated method stub + } @Override - public void ok() { - this.check(JobConsumer.JobResult.OK); + public void startProgress(long eta) { + // TODO Auto-generated method stub + } @Override - public void failed() { - this.check(JobConsumer.JobResult.FAILED); + public void startProgress(int steps) { + // TODO Auto-generated method stub + } @Override - public void cancel() { - this.check(JobConsumer.JobResult.CANCEL); + public void setProgress(int step) { + // TODO Auto-generated method stub + } - }; - job.setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, asyncHandler); - try { - result = consumer.process(job); + + @Override + public void log(String message, Object... args) { + // TODO Auto-generated method stub + + } + + @Override + public void asyncProcessingFinished(final JobStatus status) { + synchronized ( job ) { + if ( isAsync.compareAndSet(true, false) ) { + finishedJob(job.getId(), status, true); + asyncCounter.decrementAndGet(); + } else { + throw new IllegalStateException("Job is not processed async " + job.getId()); + } + } + } + }); + if ( result.getState() == JobStatus.JobState.ASYNC ) { + asyncCounter.incrementAndGet(); + notifyFinished(null); + isAsync.set(true); + } + } } catch (final Throwable t) { //NOSONAR logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t); // we don't reschedule if an exception occurs - result = JobConsumer.JobResult.CANCEL; + result = JobStatus.CANCEL; } finally { currentThread.setPriority(oldPriority); currentThread.setName(oldName); - if ( result != JobConsumer.JobResult.ASYNC ) { + if ( result.getState() != JobStatus.JobState.ASYNC ) { finishedJob(job.getId(), result, false); } } - if ( result == JobConsumer.JobResult.ASYNC ) { - asyncCounter.incrementAndGet(); - notifyFinished(null); - } } }; Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java?rev=1523723&r1=1523722&r2=1523723&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java Mon Sep 16 16:35:49 2013 @@ -30,6 +30,7 @@ public interface JobExecutionContext { /** * Report an async result. * @throws IllegalStateException If the job is not processed asynchronously + * or if this method has already been called. */ void asyncProcessingFinished(final JobStatus status); Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java?rev=1523723&r1=1523722&r2=1523723&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java Mon Sep 16 16:35:49 2013 @@ -29,7 +29,15 @@ import aQute.bnd.annotation.ProviderType @ProviderType public final class JobStatus { - enum JobState { + public static final JobStatus OK = new JobStatus(JobState.OK, null); + + public static final JobStatus FAILED = new JobStatus(JobState.FAILED, null); + + public static final JobStatus CANCEL = new JobStatus(JobState.CANCEL, null); + + public static final JobStatus ASYNC = new JobStatus(JobState.ASYNC, null); + + public enum JobState { OK, // processing finished FAILED, // processing failed, can be retried CANCEL, // processing failed permanently @@ -40,15 +48,16 @@ public final class JobStatus { private final String message; - private Long retryDelay; + private final Long retryDelay; - public JobStatus(final JobState result) { - this(result, null); + public JobStatus(final JobState result, final String message) { + this(result, message, null); } - public JobStatus(final JobState result, final String message) { + public JobStatus(final JobState result, final String message, final Long retryDelayInMs) { this.state = result; this.message = message; + this.retryDelay = retryDelayInMs; } public JobState getState() { @@ -66,7 +75,9 @@ public final class JobStatus { return this.retryDelay; } - public void setRetryDelayInMs(final Long value) { - this.retryDelay = value; + @Override + public String toString() { + return "JobStatus [state=" + state + ", message=" + message + + ", retryDelay=" + retryDelay + "]"; } }