Author: cziegeler
Date: Tue Oct  8 14:42:46 2013
New Revision: 1530297

URL: http://svn.apache.org/r1530297
Log:
SLING-3041 : Mark job as failed if async job consumer disappears

Modified:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1530297&r1=1530296&r2=1530297&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
 Tue Oct  8 14:42:46 2013
@@ -97,6 +97,8 @@ public class JobConsumerManager {
 
     private BundleContext bundleContext;
 
+    private final Map<String, Object[]> listenerMap = new HashMap<String, 
Object[]>();
+
     private Dictionary<String, Object> getRegistrationProperties() {
         final Dictionary<String, Object> serviceProps = new Hashtable<String, 
Object>();
         serviceProps.put(PropertyProvider.PROPERTY_PROPERTIES, 
TopologyCapabilities.PROPERTY_TOPICS);
@@ -162,6 +164,7 @@ public class JobConsumerManager {
         this.bundleContext = null;
         synchronized ( this.topicToConsumerMap ) {
             this.topicToConsumerMap.clear();
+            this.listenerMap.clear();
         }
     }
 
@@ -188,6 +191,18 @@ public class JobConsumerManager {
         return null;
     }
 
+    public void registerListener(final String key, final JobExecutor consumer, 
final JobExecutionContext handler) {
+        synchronized ( this.topicToConsumerMap ) {
+            this.listenerMap.put(key, new Object[] {consumer, handler});
+        }
+    }
+
+    public void unregisterListener(final String key) {
+        synchronized ( this.topicToConsumerMap ) {
+            this.listenerMap.remove(key);
+        }
+    }
+
     /**
      * Return the topics information of this instance.
      */
@@ -289,6 +304,17 @@ public class JobConsumerManager {
                         if ( topic.length() > 0 ) {
                             final List<ConsumerInfo> consumers = 
this.topicToConsumerMap.get(topic);
                             if ( consumers != null ) { // sanity check
+                                for(final ConsumerInfo oldConsumer : 
consumers) {
+                                    if ( oldConsumer.equals(info) && 
oldConsumer.executor != null ) {
+                                        // notify listener
+                                        for(final Object[] listenerObjects : 
this.listenerMap.values()) {
+                                            if ( listenerObjects[0] == 
oldConsumer.executor ) {
+                                                
((JobExecutionContext)listenerObjects[1]).asyncProcessingFinished(JobStatus.FAILED);
+                                                break;
+                                            }
+                                        }
+                                    }
+                                }
                                 consumers.remove(info);
                                 if ( consumers.size() == 0 ) {
                                     this.topicToConsumerMap.remove(topic);
@@ -358,7 +384,7 @@ public class JobConsumerManager {
 
         public final ServiceReference serviceReference;
         private final boolean isConsumer;
-        private JobExecutor executor;
+        public JobExecutor executor;
         public final int ranking;
         public final long serviceId;
 

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1530297&r1=1530296&r2=1530297&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
 Tue Oct  8 14:42:46 2013
@@ -544,7 +544,7 @@ public abstract class AbstractJobQueue
 
                                 try {
                                     synchronized ( lock ) {
-                                        result = consumer.process(job, new 
JobExecutionContext() {
+                                        final JobExecutionContext ctx = new 
JobExecutionContext() {
 
                                             private boolean hasInit = false;
 
@@ -585,6 +585,7 @@ public abstract class AbstractJobQueue
                                             public void 
asyncProcessingFinished(final JobStatus status) {
                                                 synchronized ( lock ) {
                                                     if ( 
isAsync.compareAndSet(true, false) ) {
+                                                        
jobConsumerManager.unregisterListener(job.getId());
                                                         
finishedJob(job.getId(), status, true);
                                                         
asyncCounter.decrementAndGet();
                                                     } else {
@@ -592,8 +593,10 @@ public abstract class AbstractJobQueue
                                                     }
                                                 }
                                             }
-                                        });
+                                        };
+                                        result = consumer.process(job, ctx);
                                         if ( result == null ) { // ASYNC 
processing
+                                            
jobConsumerManager.registerListener(job.getId(), consumer, ctx);
                                             asyncCounter.incrementAndGet();
                                             notifyFinished(null);
                                             isAsync.set(true);


Reply via email to