Author: cziegeler
Date: Tue Oct 8 14:42:46 2013
New Revision: 1530297
URL: http://svn.apache.org/r1530297
Log:
SLING-3041 : Mark job as failed if async job consumer disappears
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1530297&r1=1530296&r2=1530297&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
Tue Oct 8 14:42:46 2013
@@ -97,6 +97,8 @@ public class JobConsumerManager {
private BundleContext bundleContext;
+ private final Map<String, Object[]> listenerMap = new HashMap<String,
Object[]>();
+
private Dictionary<String, Object> getRegistrationProperties() {
final Dictionary<String, Object> serviceProps = new Hashtable<String,
Object>();
serviceProps.put(PropertyProvider.PROPERTY_PROPERTIES,
TopologyCapabilities.PROPERTY_TOPICS);
@@ -162,6 +164,7 @@ public class JobConsumerManager {
this.bundleContext = null;
synchronized ( this.topicToConsumerMap ) {
this.topicToConsumerMap.clear();
+ this.listenerMap.clear();
}
}
@@ -188,6 +191,18 @@ public class JobConsumerManager {
return null;
}
+ public void registerListener(final String key, final JobExecutor consumer,
final JobExecutionContext handler) {
+ synchronized ( this.topicToConsumerMap ) {
+ this.listenerMap.put(key, new Object[] {consumer, handler});
+ }
+ }
+
+ public void unregisterListener(final String key) {
+ synchronized ( this.topicToConsumerMap ) {
+ this.listenerMap.remove(key);
+ }
+ }
+
/**
* Return the topics information of this instance.
*/
@@ -289,6 +304,17 @@ public class JobConsumerManager {
if ( topic.length() > 0 ) {
final List<ConsumerInfo> consumers =
this.topicToConsumerMap.get(topic);
if ( consumers != null ) { // sanity check
+ for(final ConsumerInfo oldConsumer :
consumers) {
+ if ( oldConsumer.equals(info) &&
oldConsumer.executor != null ) {
+ // notify listener
+ for(final Object[] listenerObjects :
this.listenerMap.values()) {
+ if ( listenerObjects[0] ==
oldConsumer.executor ) {
+
((JobExecutionContext)listenerObjects[1]).asyncProcessingFinished(JobStatus.FAILED);
+ break;
+ }
+ }
+ }
+ }
consumers.remove(info);
if ( consumers.size() == 0 ) {
this.topicToConsumerMap.remove(topic);
@@ -358,7 +384,7 @@ public class JobConsumerManager {
public final ServiceReference serviceReference;
private final boolean isConsumer;
- private JobExecutor executor;
+ public JobExecutor executor;
public final int ranking;
public final long serviceId;
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1530297&r1=1530296&r2=1530297&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Tue Oct 8 14:42:46 2013
@@ -544,7 +544,7 @@ public abstract class AbstractJobQueue
try {
synchronized ( lock ) {
- result = consumer.process(job, new
JobExecutionContext() {
+ final JobExecutionContext ctx = new
JobExecutionContext() {
private boolean hasInit = false;
@@ -585,6 +585,7 @@ public abstract class AbstractJobQueue
public void
asyncProcessingFinished(final JobStatus status) {
synchronized ( lock ) {
if (
isAsync.compareAndSet(true, false) ) {
+
jobConsumerManager.unregisterListener(job.getId());
finishedJob(job.getId(), status, true);
asyncCounter.decrementAndGet();
} else {
@@ -592,8 +593,10 @@ public abstract class AbstractJobQueue
}
}
}
- });
+ };
+ result = consumer.process(job, ctx);
if ( result == null ) { // ASYNC
processing
+
jobConsumerManager.registerListener(job.getId(), consumer, ctx);
asyncCounter.incrementAndGet();
notifyFinished(null);
isAsync.set(true);