Author: cziegeler
Date: Wed Jan 28 09:27:55 2009
New Revision: 738413
URL: http://svn.apache.org/viewvc?rev=738413&view=rev
Log:
SLING-809 : Add possibility to force reprocessing of a named queue.
Added:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
(with props)
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
URL:
http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java?rev=738413&r1=738412&r2=738413&view=diff
==============================================================================
---
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
(original)
+++
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
Wed Jan 28 09:27:55 2009
@@ -100,4 +100,12 @@
* @param jobId The unique identifer as put into the property {...@link
EventUtil#PROPERTY_JOB_ID}.
*/
void cancelJob(String topic, String jobId);
+
+ /**
+ * Wake up the named job queue.
+ * If a job failed, the job queue waits (sleeps) for a configured time. By
calling this
+ * method, the job queue can be woken up and force an immediate
reprocessing.
+ * @param jobQueueName The name of the queue.
+ */
+ void wakeUpJobQueue(final String jobQueueName);
}
Added:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
URL:
http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java?rev=738413&view=auto
==============================================================================
---
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
(added)
+++
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
Wed Jan 28 09:27:55 2009
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.event.impl.AbstractRepositoryEventHandler.EventInfo;
+
+/**
+ * The job blocking queue extends the blocking queue by some
+ * functionality for the job event handling.
+ */
+public final class JobBlockingQueue extends LinkedBlockingQueue<EventInfo> {
+
+ private EventInfo eventInfo;
+
+ private final Object lock = new Object();
+
+ private boolean isWaiting = false;
+
+ private boolean markForCleanUp = false;
+
+ private boolean finished = false;
+
+ private boolean isSleeping = false;
+
+ private String schedulerJobName;
+ private Thread sleepingThread;
+
+ public EventInfo waitForFinish() throws InterruptedException {
+ this.isWaiting = true;
+ this.markForCleanUp = false;
+ this.lock.wait();
+ this.isWaiting = false;
+ final EventInfo object = this.eventInfo;
+ this.eventInfo = null;
+ return object;
+ }
+
+ public void markForCleanUp() {
+ if ( !this.isWaiting ) {
+ this.markForCleanUp = true;
+ }
+ }
+
+ public boolean isMarkedForCleanUp() {
+ return !this.isWaiting && this.markForCleanUp;
+ }
+
+ public void notifyFinish(EventInfo i) {
+ this.eventInfo = i;
+ this.lock.notify();
+ }
+
+ public Object getLock() {
+ return lock;
+ }
+
+ public boolean isWaiting() {
+ return this.isWaiting;
+ }
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ public void setFinished(boolean flag) {
+ this.finished = flag;
+ }
+
+ public void setSleeping(boolean flag) {
+ this.isSleeping = flag;
+ if ( !flag ) {
+ this.schedulerJobName = null;
+ this.sleepingThread = null;
+ }
+ }
+
+ public void setSleeping(boolean flag, String schedulerJobName) {
+ this.schedulerJobName = schedulerJobName;
+ this.setSleeping(flag);
+ }
+
+ public void setSleeping(boolean flag, Thread sleepingThread) {
+ this.sleepingThread = sleepingThread;
+ this.setSleeping(flag);
+ }
+
+ public String getSchedulerJobName() {
+ return this.schedulerJobName;
+ }
+
+ public Thread getSleepingThread() {
+ return this.sleepingThread;
+ }
+
+ public boolean isSleeping() {
+ return this.isSleeping;
+ }
+}
+
Propchange:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL:
http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=738413&r1=738412&r2=738413&view=diff
==============================================================================
---
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Wed Jan 28 09:27:55 2009
@@ -31,7 +31,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import javax.jcr.Item;
import javax.jcr.ItemExistsException;
@@ -541,10 +540,13 @@
// this job again
if (
job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
final long delay =
(Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ jobQueue.setSleeping(true,
Thread.currentThread());
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
this.ignoreException(e);
+ } finally {
+ jobQueue.setSleeping(false);
}
}
info = newInfo;
@@ -555,18 +557,22 @@
final Date fireDate = new Date();
fireDate.setTime(System.currentTimeMillis() + delay);
+ final String schedulerJobName = "Waiting:"
+ queueName;
final Runnable t = new Runnable() {
public void run() {
+ jobQueue.setSleeping(true,
schedulerJobName);
try {
jobQueue.put(newEventInfo);
} catch (InterruptedException e) {
// this should never happen
ignoreException(e);
+ } finally {
+ jobQueue.setSleeping(false);
}
}
};
try {
- this.scheduler.fireJobAt(null, t,
null, fireDate);
+
this.scheduler.fireJobAt(schedulerJobName, t, null, fireDate);
} catch (Exception e) {
// we ignore the exception and just
put back the job in the queue
ignoreException(e);
@@ -1411,60 +1417,28 @@
}
- private static final class JobBlockingQueue extends
LinkedBlockingQueue<EventInfo> {
-
- private EventInfo eventInfo;
-
- private final Object lock = new Object();
-
- private boolean isWaiting = false;
-
- private boolean markForCleanUp = false;
-
- private boolean finished = false;
-
- public EventInfo waitForFinish() throws InterruptedException {
- this.isWaiting = true;
- this.markForCleanUp = false;
- this.lock.wait();
- this.isWaiting = false;
- final EventInfo object = this.eventInfo;
- this.eventInfo = null;
- return object;
- }
-
- public void markForCleanUp() {
- if ( !this.isWaiting ) {
- this.markForCleanUp = true;
+ /**
+ * @see
org.apache.sling.event.JobStatusProvider#wakeUpJobQueue(java.lang.String)
+ */
+ public void wakeUpJobQueue(String jobQueueName) {
+ if ( jobQueueName != null ) {
+ synchronized ( this.jobQueues ) {
+ final JobBlockingQueue queue =
this.jobQueues.get(jobQueueName);
+ if ( queue != null && queue.isSleeping() ) {
+ final String schedulerJobName =
queue.getSchedulerJobName();
+ final Thread thread = queue.getSleepingThread();
+ if ( schedulerJobName != null ) {
+ this.scheduler.removeJob(schedulerJobName);
+ }
+ if ( thread != null ) {
+ thread.interrupt();
+ }
+ }
}
}
-
- public boolean isMarkedForCleanUp() {
- return !this.isWaiting && this.markForCleanUp;
- }
-
- public void notifyFinish(EventInfo i) {
- this.eventInfo = i;
- this.lock.notify();
- }
-
- public Object getLock() {
- return lock;
- }
-
- public boolean isWaiting() {
- return this.isWaiting;
- }
-
- public boolean isFinished() {
- return finished;
- }
-
- public void setFinished(boolean flag) {
- this.finished = flag;
- }
}
+
private static final class StartedJobInfo {
public final Event event;
public final String nodePath;