Author: cziegeler
Date: Mon Sep 16 16:35:49 2013
New Revision: 1523723

URL: http://svn.apache.org/r1523723
Log:
SLING-3028 :  Support for progress tracking of jobs 

Modified:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1523723&r1=1523722&r2=1523723&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
 Mon Sep 16 16:35:49 2013
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -35,12 +36,18 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.References;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.discovery.PropertyProvider;
 import org.apache.sling.event.impl.support.TopicMatcher;
 import org.apache.sling.event.impl.support.TopicMatcherHelper;
+import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.event.jobs.consumer.JobConsumer.JobResult;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
+import org.apache.sling.event.jobs.consumer.JobStatus;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceReference;
@@ -54,9 +61,14 @@ import org.slf4j.LoggerFactory;
            description="%job.consumermanager.description",
            metatype=true)
 @Service(value=JobConsumerManager.class)
-@Reference(referenceInterface=JobConsumer.class,
-           cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
-           policy=ReferencePolicy.DYNAMIC)
+@References({
+    @Reference(referenceInterface=JobConsumer.class,
+            cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
+            policy=ReferencePolicy.DYNAMIC),
+    @Reference(referenceInterface=JobExecutor.class,
+            cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
+            policy=ReferencePolicy.DYNAMIC)
+})
 @Property(name="org.apache.sling.installer.configuration.persist", 
boolValue=false, propertyPrivate=true)
 public class JobConsumerManager {
 
@@ -154,22 +166,22 @@ public class JobConsumerManager {
     }
 
     /**
-     * Get the consumer for the topic.
+     * Get the executor for the topic.
      * @param topic The job topic
      * @return A consumer or <code>null</code>
      */
-    public JobConsumer getConsumer(final String topic) {
+    public JobExecutor getExecutor(final String topic) {
         synchronized ( this.topicToConsumerMap ) {
             final List<ConsumerInfo> consumers = 
this.topicToConsumerMap.get(topic);
             if ( consumers != null ) {
-                return consumers.get(0).getConsumer(this.bundleContext);
+                return consumers.get(0).getExecutor(this.bundleContext);
             }
             final int pos = topic.lastIndexOf('/');
             if ( pos > 0 ) {
                 final String category = topic.substring(0, pos + 
1).concat("*");
                 final List<ConsumerInfo> categoryConsumers = 
this.topicToConsumerMap.get(category);
                 if ( categoryConsumers != null ) {
-                    return 
categoryConsumers.get(0).getConsumer(this.bundleContext);
+                    return 
categoryConsumers.get(0).getExecutor(this.bundleContext);
                 }
             }
         }
@@ -195,9 +207,42 @@ public class JobConsumerManager {
      * @param serviceReference The service reference to the consumer.
      */
     protected void bindJobConsumer(final ServiceReference serviceReference) {
+        this.bindService(serviceReference, true);
+    }
+
+    /**
+     * Unbind a consumer
+     * @param serviceReference The service reference to the consumer.
+     */
+    protected void unbindJobConsumer(final ServiceReference serviceReference) {
+        this.unbindService(serviceReference, true);
+    }
+
+    /**
+     * Bind a new executor
+     * @param serviceReference The service reference to the executor.
+     */
+    protected void bindJobExecutor(final ServiceReference serviceReference) {
+        this.bindService(serviceReference, false);
+    }
+
+    /**
+     * Unbind a executor
+     * @param serviceReference The service reference to the executor.
+     */
+    protected void unbindJobExecutor(final ServiceReference serviceReference) {
+        this.unbindService(serviceReference, false);
+    }
+
+    /**
+     * Bind a consumer or executor
+     * @param serviceReference The service reference to the consumer or 
executor.
+     * @param isConsumer Indicating whether this is a JobConsumer or 
JobExecutor
+     */
+    private void bindService(final ServiceReference serviceReference, final 
boolean isConsumer) {
         final String[] topics = 
PropertiesUtil.toStringArray(serviceReference.getProperty(JobConsumer.PROPERTY_TOPICS));
         if ( topics != null && topics.length > 0 ) {
-            final ConsumerInfo info = new ConsumerInfo(serviceReference);
+            final ConsumerInfo info = new ConsumerInfo(serviceReference, 
isConsumer);
             boolean changed = false;
             synchronized ( this.topicToConsumerMap ) {
                 for(final String t : topics) {
@@ -228,13 +273,14 @@ public class JobConsumerManager {
     }
 
     /**
-     * Unbind a consumer
-     * @param serviceReference The service reference to the consumer.
+     * Unbind a consumer or executor
+     * @param serviceReference The service reference to the consumer or 
executor.
+     * @param isConsumer Indicating whether this is a JobConsumer or 
JobExecutor
      */
-    protected void unbindJobConsumer(final ServiceReference serviceReference) {
+    private void unbindService(final ServiceReference serviceReference, final 
boolean isConsumer) {
         final String[] topics = 
PropertiesUtil.toStringArray(serviceReference.getProperty(JobConsumer.PROPERTY_TOPICS));
         if ( topics != null && topics.length > 0 ) {
-            final ConsumerInfo info = new ConsumerInfo(serviceReference);
+            final ConsumerInfo info = new ConsumerInfo(serviceReference, 
isConsumer);
             boolean changed = false;
             synchronized ( this.topicToConsumerMap ) {
                 for(final String t : topics) {
@@ -311,12 +357,14 @@ public class JobConsumerManager {
     private final static class ConsumerInfo implements 
Comparable<ConsumerInfo> {
 
         public final ServiceReference serviceReference;
-        private JobConsumer consumer;
+        private final boolean isConsumer;
+        private JobExecutor executor;
         public final int ranking;
         public final long serviceId;
 
-        public ConsumerInfo(final ServiceReference serviceReference) {
+        public ConsumerInfo(final ServiceReference serviceReference, final 
boolean isConsumer) {
             this.serviceReference = serviceReference;
+            this.isConsumer = isConsumer;
             final Object sr = 
serviceReference.getProperty(Constants.SERVICE_RANKING);
             if ( sr == null || !(sr instanceof Integer)) {
                 this.ranking = 0;
@@ -350,11 +398,70 @@ public class JobConsumerManager {
             return serviceReference.hashCode();
         }
 
-        public JobConsumer getConsumer(final BundleContext bundleContext) {
-            if ( consumer == null ) {
-                consumer = (JobConsumer) 
bundleContext.getService(this.serviceReference);
+        public JobExecutor getExecutor(final BundleContext bundleContext) {
+            if ( executor == null ) {
+                if ( this.isConsumer ) {
+                    executor = new JobConsumerWrapper((JobConsumer) 
bundleContext.getService(this.serviceReference));
+                } else {
+                    executor = (JobExecutor) 
bundleContext.getService(this.serviceReference);
+                }
+            }
+            return executor;
+        }
+    }
+
+    private final static class JobConsumerWrapper implements JobExecutor {
+
+        private final JobConsumer consumer;
+
+        public JobConsumerWrapper(final JobConsumer consumer) {
+            this.consumer = consumer;
+        }
+
+        @Override
+        public JobStatus process(final Job job, final JobExecutionContext 
context) {
+            final JobConsumer.AsyncHandler asyncHandler =
+                    new JobConsumer.AsyncHandler() {
+
+                        final Object asyncLock = new Object();
+                        final AtomicBoolean asyncDone = new 
AtomicBoolean(false);
+
+                        private void check(final JobStatus result) {
+                            synchronized ( asyncLock ) {
+                                if ( !asyncDone.get() ) {
+                                    asyncDone.set(true);
+                                    context.asyncProcessingFinished(result);
+                                } else {
+                                    throw new IllegalStateException("Job is 
already marked as processed");
+                                }
+                            }
+                        }
+
+                        @Override
+                        public void ok() {
+                            this.check(JobStatus.OK);
+                        }
+
+                        @Override
+                        public void failed() {
+                            this.check(JobStatus.FAILED);
+                        }
+
+                        @Override
+                        public void cancel() {
+                            this.check(JobStatus.CANCEL);
+                        }
+                    };
+            ((JobImpl)job).setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, 
asyncHandler);
+            final JobConsumer.JobResult result = this.consumer.process(job);
+            if ( result == JobResult.ASYNC ) {
+                return JobStatus.ASYNC;
+            } else if ( result == JobResult.FAILED) {
+                return JobStatus.FAILED;
+            } else if ( result == JobResult.OK) {
+                return JobStatus.OK;
             }
-            return consumer;
+            return JobStatus.CANCEL;
         }
     }
 }

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1523723&r1=1523722&r2=1523723&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
 Mon Sep 16 16:35:49 2013
@@ -77,7 +77,7 @@ import org.apache.sling.event.jobs.Queue
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.Statistics;
 import org.apache.sling.event.jobs.TopicStatistics;
-import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
 import org.apache.sling.event.jobs.jmx.QueuesMBean;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -279,7 +279,7 @@ public class JobManagerImpl
      */
     void process(final JobImpl job) {
         // check if we still are able to process this job
-        final JobConsumer consumer = 
this.jobConsumerManager.getConsumer(job.getTopic());
+        final JobExecutor consumer = 
this.jobConsumerManager.getExecutor(job.getTopic());
         boolean reassign = false;
         String reassignTargetId = null;
         if ( consumer == null && (!job.isBridgedEvent() || 
!this.jobConsumerManager.supportsBridgedEvents())) {

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1523723&r1=1523722&r2=1523723&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
 Mon Sep 16 16:35:49 2013
@@ -43,8 +43,9 @@ import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.Statistics;
-import org.apache.sling.event.jobs.consumer.JobConsumer;
-import org.apache.sling.event.jobs.consumer.JobConsumer.JobResult;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
+import org.apache.sling.event.jobs.consumer.JobStatus;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -296,9 +297,9 @@ public abstract class AbstractJobQueue
         return ack != null;
     }
 
-    private boolean handleReschedule(final JobHandler jobEvent, final 
JobConsumer.JobResult result) {
+    private boolean handleReschedule(final JobHandler jobEvent, final 
JobStatus result) {
         boolean reschedule = false;
-        switch ( result ) {
+        switch ( result.getState() ) {
             case OK : // job is finished
                 if ( this.logger.isDebugEnabled() ) {
                     this.logger.debug("Finished job {}", 
Utility.toString(jobEvent.getJob()));
@@ -351,11 +352,11 @@ public abstract class AbstractJobQueue
     @Override
     public boolean finishedJob(final Event job, final boolean 
shouldReschedule) {
         final String location = (String)job.getProperty(JobUtil.JOB_ID);
-        return this.finishedJob(location, shouldReschedule ? JobResult.FAILED 
: JobResult.OK, false);
+        return this.finishedJob(location, shouldReschedule ? JobStatus.FAILED 
: JobStatus.OK, false);
     }
 
     private boolean finishedJob(final String jobId,
-                                final JobConsumer.JobResult result,
+                                final JobStatus result,
                                 final boolean isAsync) {
         if ( this.logger.isDebugEnabled() ) {
             this.logger.debug("Received finish for job {}, result={}", jobId, 
result);
@@ -487,7 +488,7 @@ public abstract class AbstractJobQueue
      */
     protected boolean executeJob(final JobHandler handler) {
         final JobImpl job = handler.getJob();
-        final JobConsumer consumer = 
this.jobConsumerManager.getConsumer(job.getTopic());
+        final JobExecutor consumer = 
this.jobConsumerManager.getExecutor(job.getTopic());
 
         if ( (consumer != null || (job.isBridgedEvent() && 
this.jobConsumerManager.supportsBridgedEvents())) ) {
             if ( handler.start() ) {
@@ -528,58 +529,72 @@ public abstract class AbstractJobQueue
                                                     break;
                                     }
                                 }
-                                JobConsumer.JobResult result = 
JobConsumer.JobResult.CANCEL;
-                                final JobConsumer.AsyncHandler asyncHandler =
-                                        new JobConsumer.AsyncHandler() {
-
-                                            final Object asyncLock = new 
Object();
-                                            final AtomicBoolean asyncDone = 
new AtomicBoolean(false);
-
-                                            private void check(final 
JobConsumer.JobResult result) {
-                                                synchronized ( asyncLock ) {
-                                                    if ( !asyncDone.get() ) {
-                                                        asyncDone.set(true);
-                                                        
finishedJob(job.getId(), result, true);
-                                                        
asyncCounter.decrementAndGet();
-                                                    } else {
-                                                        throw new 
IllegalStateException("Job is already marked as processed");
-                                                    }
-                                                }
+                                JobStatus result = JobStatus.CANCEL;
+                                final AtomicBoolean isAsync = new 
AtomicBoolean(false);
+
+                                try {
+                                    synchronized ( job ) {
+                                        result = consumer.process(job, new 
JobExecutionContext() {
+
+                                            @Override
+                                            public void update(long eta) {
+                                                // TODO Auto-generated method 
stub
+
                                             }
 
                                             @Override
-                                            public void ok() {
-                                                
this.check(JobConsumer.JobResult.OK);
+                                            public void startProgress(long 
eta) {
+                                                // TODO Auto-generated method 
stub
+
                                             }
 
                                             @Override
-                                            public void failed() {
-                                                
this.check(JobConsumer.JobResult.FAILED);
+                                            public void startProgress(int 
steps) {
+                                                // TODO Auto-generated method 
stub
+
                                             }
 
                                             @Override
-                                            public void cancel() {
-                                                
this.check(JobConsumer.JobResult.CANCEL);
+                                            public void setProgress(int step) {
+                                                // TODO Auto-generated method 
stub
+
                                             }
-                                        };
-                                
job.setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, asyncHandler);
-                                try {
-                                    result = consumer.process(job);
+
+                                            @Override
+                                            public void log(String message, 
Object... args) {
+                                                // TODO Auto-generated method 
stub
+
+                                            }
+
+                                            @Override
+                                            public void 
asyncProcessingFinished(final JobStatus status) {
+                                                synchronized ( job ) {
+                                                    if ( 
isAsync.compareAndSet(true, false) ) {
+                                                        
finishedJob(job.getId(), status, true);
+                                                        
asyncCounter.decrementAndGet();
+                                                    } else {
+                                                        throw new 
IllegalStateException("Job is not processed async " + job.getId());
+                                                    }
+                                                }
+                                            }
+                                        });
+                                        if ( result.getState() == 
JobStatus.JobState.ASYNC ) {
+                                            asyncCounter.incrementAndGet();
+                                            notifyFinished(null);
+                                            isAsync.set(true);
+                                        }
+                                    }
                                 } catch (final Throwable t) { //NOSONAR
                                     logger.error("Unhandled error occured in 
job processor " + t.getMessage() + " while processing job " + 
Utility.toString(job), t);
                                     // we don't reschedule if an exception 
occurs
-                                    result = JobConsumer.JobResult.CANCEL;
+                                    result = JobStatus.CANCEL;
                                 } finally {
                                     currentThread.setPriority(oldPriority);
                                     currentThread.setName(oldName);
-                                    if ( result != JobConsumer.JobResult.ASYNC 
) {
+                                    if ( result.getState() != 
JobStatus.JobState.ASYNC ) {
                                         finishedJob(job.getId(), result, 
false);
                                     }
                                 }
-                                if ( result == JobConsumer.JobResult.ASYNC ) {
-                                    asyncCounter.incrementAndGet();
-                                    notifyFinished(null);
-                                }
                             }
 
                         };

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java?rev=1523723&r1=1523722&r2=1523723&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java
 Mon Sep 16 16:35:49 2013
@@ -30,6 +30,7 @@ public interface JobExecutionContext {
     /**
      * Report an async result.
      * @throws IllegalStateException If the job is not processed asynchronously
+     *                               or if this method has already been called.
      */
     void asyncProcessingFinished(final JobStatus status);
 

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java?rev=1523723&r1=1523722&r2=1523723&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java
 Mon Sep 16 16:35:49 2013
@@ -29,7 +29,15 @@ import aQute.bnd.annotation.ProviderType
 @ProviderType
 public final class JobStatus {
 
-    enum JobState {
+    public static final JobStatus OK = new JobStatus(JobState.OK, null);
+
+    public static final JobStatus FAILED = new JobStatus(JobState.FAILED, 
null);
+
+    public static final JobStatus CANCEL = new JobStatus(JobState.CANCEL, 
null);
+
+    public static final JobStatus ASYNC = new JobStatus(JobState.ASYNC, null);
+
+    public enum JobState {
         OK,      // processing finished
         FAILED,  // processing failed, can be retried
         CANCEL,  // processing failed permanently
@@ -40,15 +48,16 @@ public final class JobStatus {
 
     private final String message;
 
-    private Long retryDelay;
+    private final Long retryDelay;
 
-    public JobStatus(final JobState result) {
-        this(result, null);
+    public JobStatus(final JobState result, final String message) {
+        this(result, message, null);
     }
 
-    public JobStatus(final JobState result, final String message) {
+    public JobStatus(final JobState result, final String message, final Long 
retryDelayInMs) {
         this.state = result;
         this.message = message;
+        this.retryDelay = retryDelayInMs;
     }
 
     public JobState getState() {
@@ -66,7 +75,9 @@ public final class JobStatus {
         return this.retryDelay;
     }
 
-    public void setRetryDelayInMs(final Long value) {
-        this.retryDelay = value;
+    @Override
+    public String toString() {
+        return "JobStatus [state=" + state + ", message=" + message
+                + ", retryDelay=" + retryDelay + "]";
     }
 }


Reply via email to