Author: cziegeler
Date: Wed Jan 28 09:27:55 2009
New Revision: 738413

URL: http://svn.apache.org/viewvc?rev=738413&view=rev
Log:
SLING-809 : Add possibility to force reprocessing of a named queue.

Added:
    
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
   (with props)
Modified:
    
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
    
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Modified: 
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
URL: 
http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java?rev=738413&r1=738412&r2=738413&view=diff
==============================================================================
--- 
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
 (original)
+++ 
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
 Wed Jan 28 09:27:55 2009
@@ -100,4 +100,12 @@
      * @param jobId The unique identifer as put into the property {...@link 
EventUtil#PROPERTY_JOB_ID}.
      */
     void cancelJob(String topic, String jobId);
+
+    /**
+     * Wake up the named job queue.
+     * If a job failed, the job queue waits (sleeps) for a configured time. By 
calling this
+     * method, the job queue can be woken up and force an immediate 
reprocessing.
+     * @param jobQueueName The name of the queue.
+     */
+    void wakeUpJobQueue(final String jobQueueName);
 }

Added: 
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java?rev=738413&view=auto
==============================================================================
--- 
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
 (added)
+++ 
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
 Wed Jan 28 09:27:55 2009
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.event.impl.AbstractRepositoryEventHandler.EventInfo;
+
+/**
+ * The job blocking queue extends the blocking queue by some
+ * functionality for the job event handling.
+ */
+public final class JobBlockingQueue extends LinkedBlockingQueue<EventInfo> {
+
+    private EventInfo eventInfo;
+
+    private final Object lock = new Object();
+
+    private boolean isWaiting = false;
+
+    private boolean markForCleanUp = false;
+
+    private boolean finished = false;
+
+    private boolean isSleeping = false;
+
+    private String schedulerJobName;
+    private Thread sleepingThread;
+
+    public EventInfo waitForFinish() throws InterruptedException {
+        this.isWaiting = true;
+        this.markForCleanUp = false;
+        this.lock.wait();
+        this.isWaiting = false;
+        final EventInfo object = this.eventInfo;
+        this.eventInfo = null;
+        return object;
+    }
+
+    public void markForCleanUp() {
+        if ( !this.isWaiting ) {
+            this.markForCleanUp = true;
+        }
+    }
+
+    public boolean isMarkedForCleanUp() {
+        return !this.isWaiting && this.markForCleanUp;
+    }
+
+    public void notifyFinish(EventInfo i) {
+        this.eventInfo = i;
+        this.lock.notify();
+    }
+
+    public Object getLock() {
+        return lock;
+    }
+
+    public boolean isWaiting() {
+        return this.isWaiting;
+    }
+
+    public boolean isFinished() {
+        return finished;
+    }
+
+    public void setFinished(boolean flag) {
+        this.finished = flag;
+    }
+
+    public void setSleeping(boolean flag) {
+        this.isSleeping = flag;
+        if ( !flag ) {
+            this.schedulerJobName = null;
+            this.sleepingThread = null;
+        }
+    }
+
+    public void setSleeping(boolean flag, String schedulerJobName) {
+        this.schedulerJobName = schedulerJobName;
+        this.setSleeping(flag);
+    }
+
+    public void setSleeping(boolean flag, Thread sleepingThread) {
+        this.sleepingThread = sleepingThread;
+        this.setSleeping(flag);
+    }
+
+    public String getSchedulerJobName() {
+        return this.schedulerJobName;
+    }
+
+    public Thread getSleepingThread() {
+        return this.sleepingThread;
+    }
+
+    public boolean isSleeping() {
+        return this.isSleeping;
+    }
+}
+

Propchange: 
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: 
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=738413&r1=738412&r2=738413&view=diff
==============================================================================
--- 
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
 (original)
+++ 
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
 Wed Jan 28 09:27:55 2009
@@ -31,7 +31,6 @@
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.jcr.Item;
 import javax.jcr.ItemExistsException;
@@ -541,10 +540,13 @@
                                 // this job again
                                 if ( 
job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
                                     final long delay = 
(Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+                                    jobQueue.setSleeping(true, 
Thread.currentThread());
                                     try {
                                         Thread.sleep(delay);
                                     } catch (InterruptedException e) {
                                         this.ignoreException(e);
+                                    } finally {
+                                        jobQueue.setSleeping(false);
                                     }
                                 }
                                 info = newInfo;
@@ -555,18 +557,22 @@
                                     final Date fireDate = new Date();
                                     
fireDate.setTime(System.currentTimeMillis() + delay);
 
+                                    final String schedulerJobName = "Waiting:" 
+ queueName;
                                     final Runnable t = new Runnable() {
                                         public void run() {
+                                            jobQueue.setSleeping(true, 
schedulerJobName);
                                             try {
                                                 jobQueue.put(newEventInfo);
                                             } catch (InterruptedException e) {
                                                 // this should never happen
                                                 ignoreException(e);
+                                            } finally {
+                                                jobQueue.setSleeping(false);
                                             }
                                         }
                                     };
                                     try {
-                                        this.scheduler.fireJobAt(null, t, 
null, fireDate);
+                                        
this.scheduler.fireJobAt(schedulerJobName, t, null, fireDate);
                                     } catch (Exception e) {
                                         // we ignore the exception and just 
put back the job in the queue
                                         ignoreException(e);
@@ -1411,60 +1417,28 @@
     }
 
 
-    private static final class JobBlockingQueue extends 
LinkedBlockingQueue<EventInfo> {
-
-        private EventInfo eventInfo;
-
-        private final Object lock = new Object();
-
-        private boolean isWaiting = false;
-
-        private boolean markForCleanUp = false;
-
-        private boolean finished = false;
-
-        public EventInfo waitForFinish() throws InterruptedException {
-            this.isWaiting = true;
-            this.markForCleanUp = false;
-            this.lock.wait();
-            this.isWaiting = false;
-            final EventInfo object = this.eventInfo;
-            this.eventInfo = null;
-            return object;
-        }
-
-        public void markForCleanUp() {
-            if ( !this.isWaiting ) {
-                this.markForCleanUp = true;
+    /**
+     * @see 
org.apache.sling.event.JobStatusProvider#wakeUpJobQueue(java.lang.String)
+     */
+    public void wakeUpJobQueue(String jobQueueName) {
+        if ( jobQueueName != null ) {
+            synchronized ( this.jobQueues ) {
+                final JobBlockingQueue queue = 
this.jobQueues.get(jobQueueName);
+                if ( queue != null && queue.isSleeping() ) {
+                    final String schedulerJobName = 
queue.getSchedulerJobName();
+                    final Thread thread = queue.getSleepingThread();
+                    if ( schedulerJobName != null ) {
+                        this.scheduler.removeJob(schedulerJobName);
+                    }
+                    if ( thread != null ) {
+                        thread.interrupt();
+                    }
+                }
             }
         }
-
-        public boolean isMarkedForCleanUp() {
-            return !this.isWaiting && this.markForCleanUp;
-        }
-
-        public void notifyFinish(EventInfo i) {
-            this.eventInfo = i;
-            this.lock.notify();
-        }
-
-        public Object getLock() {
-            return lock;
-        }
-
-        public boolean isWaiting() {
-            return this.isWaiting;
-        }
-
-        public boolean isFinished() {
-            return finished;
-        }
-
-        public void setFinished(boolean flag) {
-            this.finished = flag;
-        }
     }
 
+
     private static final class StartedJobInfo {
         public final Event event;
         public final String nodePath;


Reply via email to