Author: cziegeler Date: Fri Oct 15 06:53:29 2010 New Revision: 1022833 URL: http://svn.apache.org/viewvc?rev=1022833&view=rev Log: Allow topic matching with trailing slash (like event admin topic configs) Remove all starts a background thread for actual removing Create only three level hierarchy for jobs Add a method to retrieve queue state info in a generic way
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1022833&r1=1022832&r2=1022833&view=diff ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Fri Oct 15 06:53:29 2010 @@ -128,22 +128,26 @@ public class Utility { // we create an md from the job id - we use the first 6 bytes to // create sub directories final String md5 = md5(jobId); - sb.append(md5.substring(0, 2)); + sb.append(md5.charAt(0)); + sb.append(md5.charAt(1)); + sb.append(md5.charAt(2)); sb.append('/'); - sb.append(md5.substring(2, 4)); - sb.append('/'); - sb.append(md5.substring(4, 6)); + sb.append(md5.charAt(3)); + sb.append(md5.charAt(4)); + sb.append(md5.charAt(5)); sb.append('/'); sb.append(filter(jobId)); } else { // create a path from the uuid - we use the first 6 bytes to // create sub directories final String uuid = UUID.randomUUID().toString(); - sb.append(uuid.substring(0, 2)); - sb.append('/'); - sb.append(uuid.substring(2, 4)); + sb.append(uuid.charAt(0)); + sb.append(uuid.charAt(1)); + sb.append(uuid.charAt(2)); sb.append('/'); - sb.append(uuid.substring(5, 7)); + sb.append(uuid.charAt(3)); + sb.append(uuid.charAt(5)); + sb.append(uuid.charAt(6)); sb.append("/Job_"); sb.append(uuid.substring(8, 17)); } Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1022833&r1=1022832&r2=1022833&view=diff ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Fri Oct 15 06:53:29 2010 @@ -158,9 +158,9 @@ public class InternalQueueConfiguration } if ( value != null && value.length() > 0 ) { if ( value.endsWith(".") ) { - newMatchers[i] = new PackageMatcher(value.substring(0, value.length() - 1)); + newMatchers[i] = new PackageMatcher(value); } else if ( value.endsWith("*") ) { - newMatchers[i] = new SubPackageMatcher(value.substring(0, value.length() - 1)); + newMatchers[i] = new SubPackageMatcher(value); } else { newMatchers[i] = new ClassMatcher(value); } @@ -404,37 +404,71 @@ public class InternalQueueConfiguration ", isValid=" + this.isValid() + "}"; } + /** + * Internal interface for topic matching + */ private static interface Matcher { + /** Check if the topic matches and return the variable part - null if not matching. */ String match(String topic); } + + /** Package matcher - the topic must be in the same package. */ private static final class PackageMatcher implements Matcher { + private final String packageName; public PackageMatcher(final String name) { - this.packageName = name; + // remove last char and maybe a trailing slash + int lastPos = name.length() - 1; + if ( lastPos > 0 && name.charAt(lastPos - 1) == '/' ) { + lastPos--; + } + this.packageName = name.substring(0, lastPos); } + + /** + * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String) + */ public String match(final String topic) { final int pos = topic.lastIndexOf('/'); return pos > -1 && topic.substring(0, pos).equals(packageName) ? topic.substring(pos + 1) : null; } } + + /** Sub package matcher - the topic must be in the same package or a sub package. */ private static final class SubPackageMatcher implements Matcher { private final String packageName; public SubPackageMatcher(final String name) { - this.packageName = name + '/'; + // remove last char and maybe a trailing slash + int lastPos = name.length() - 1; + if ( lastPos > 0 && name.charAt(lastPos - 1) == '/' ) { + this.packageName = name.substring(0, lastPos); + } else { + this.packageName = name.substring(0, lastPos) + '/'; + } } + + /** + * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String) + */ public String match(final String topic) { final int pos = topic.lastIndexOf('/'); return pos > -1 && topic.substring(0, pos + 1).startsWith(this.packageName) ? topic.substring(this.packageName.length()) : null; } } + + /** The topic must match exactly. */ private static final class ClassMatcher implements Matcher { private final String className; public ClassMatcher(final String name) { this.className = name; } + + /** + * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String) + */ public String match(String topic) { return this.className.equals(topic) ? "" : null; } Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java?rev=1022833&r1=1022832&r2=1022833&view=diff ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java (original) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java Fri Oct 15 06:53:29 2010 @@ -177,7 +177,7 @@ public class WebConsolePlugin extends Ht pw.printf("<tr><td>Processed Jobs</td><td>%s</td><td colspan='2'> </td></tr>", s.getNumberOfProcessedJobs()); pw.printf("<tr><td>Average Processing Time</td><td>%s</td><td colspan='2'> </td></tr>", formatTime(s.getAverageProcessingTime())); pw.printf("<tr><td>Average Waiting Time</td><td>%s</td><td colspan='2'> </td></tr>", formatTime(s.getAverageWaitingTime())); - pw.printf("<tr><td>Status Info</td><td colspan='3'>%s</td></tr>", escape(q.getStatusInfo())); + pw.printf("<tr><td>Status Info</td><td colspan='3'>%s</td></tr>", escape(q.getStateInfo())); pw.println("</tbody></table>"); pw.println("<br/>"); } @@ -357,7 +357,7 @@ public class WebConsolePlugin extends Ht pw.printf("Processed Jobs : %s%n", s.getNumberOfProcessedJobs()); pw.printf("Average Processing Time : %s%n", formatTime(s.getAverageProcessingTime())); pw.printf("Average Waiting Time : %s%n", formatTime(s.getAverageWaitingTime())); - pw.printf("Status Info : %s%n", q.getStatusInfo()); + pw.printf("Status Info : %s%n", q.getStateInfo()); pw.println("Configuration"); pw.printf("Type : %s%n", formatType(c.getType())); pw.printf("Topics : %s%n", formatArrayAsText(c.getTopics())); Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Fri Oct 15 06:53:29 2010 @@ -19,6 +19,7 @@ package org.apache.sling.event.impl.jobs.queues; import java.util.ArrayList; +import java.util.Collection; import java.util.Dictionary; import java.util.HashMap; import java.util.Iterator; @@ -105,7 +106,10 @@ public abstract class AbstractJobQueue this.environment = environment; } - public String getStatusInfo() { + /** + * @see org.apache.sling.event.jobs.Queue#getStateInfo() + */ + public String getStateInfo() { return "isWaiting=" + this.isWaiting + ", markedForCleanUp=" + this.markedForCleanUp + ", suspendedSince=" + this.suspendedSince.longValue(); } @@ -504,16 +508,6 @@ public abstract class AbstractJobQueue this.queueName = name; } - protected abstract JobEvent start(final JobEvent event); - - protected abstract void put(final JobEvent event); - - protected abstract JobEvent take(); - - protected abstract boolean isEmpty(); - - protected abstract void notifyFinished(final JobEvent rescheduleInfo); - /** * Reschedule a job. */ @@ -556,16 +550,27 @@ public abstract class AbstractJobQueue /** * @see org.apache.sling.event.jobs.Queue#removeAll() */ - public void removeAll() { + public synchronized void removeAll() { + // we suspend the queue final boolean wasSuspended = this.isSuspended(); this.suspend(); - while ( !this.isEmpty() ) { - final JobEvent event = this.take(); - if ( event != null ) { - event.remove(); - } - } + // we copy all events and remove them in the background + final Collection<JobEvent> events = this.removeAllJobs(); this.clearQueued(); + final Thread t = new Thread(new Runnable() { + + /** + * @see java.lang.Runnable#run() + */ + public void run() { + for(final JobEvent job : events) { + job.remove(); + } + } + }, "Queue RemoveAll Thread for " + this.queueName); + t.setDaemon(true); + t.start(); + // start queue again if ( !wasSuspended ) { this.resume(); } @@ -577,5 +582,37 @@ public abstract class AbstractJobQueue public void clear() { this.clearQueued(); } + + /** + * @see org.apache.sling.event.jobs.Queue#getState(java.lang.String) + */ + public Object getState(final String key) { + // not supported for now + return null; + } + + /** + * Put another job into the queue. + */ + protected abstract void put(final JobEvent event); + + /** + * Get another job from the queue. + */ + protected abstract JobEvent take(); + + /** + * Is the queue empty? + */ + protected abstract boolean isEmpty(); + + /** + * Remove all events from the queue and return them. + */ + protected abstract Collection<JobEvent> removeAllJobs(); + + protected abstract JobEvent start(final JobEvent event); + + protected abstract void notifyFinished(final JobEvent rescheduleInfo); } Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (original) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Fri Oct 15 06:53:29 2010 @@ -45,8 +45,8 @@ public abstract class AbstractParallelJo } @Override - public String getStatusInfo() { - return super.getStatusInfo() + ", jobCount=" + this.jobCount; + public String getStateInfo() { + return super.getStateInfo() + ", jobCount=" + this.jobCount; } @Override Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Fri Oct 15 06:53:29 2010 @@ -18,6 +18,9 @@ */ package org.apache.sling.event.impl.jobs.queues; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -38,7 +41,7 @@ public final class OrderedJobQueue exten private JobEvent jobEvent; /** Marker indicating that this queue is currently sleeping. */ - private volatile boolean isSleeping = false; + private volatile long isSleepingUntil = -1; /** The sleeping thread. */ private volatile Thread sleepingThread; @@ -53,8 +56,8 @@ public final class OrderedJobQueue exten } @Override - public String getStatusInfo() { - return super.getStatusInfo() + ", isSleeping=" + this.isSleeping; + public String getStateInfo() { + return super.getStateInfo() + ", isSleepingUntil=" + this.isSleepingUntil; } @Override @@ -68,21 +71,19 @@ public final class OrderedJobQueue exten return rescheduleInfo; } - private void setSleeping(boolean flag) { - this.isSleeping = flag; - if ( !flag ) { - this.sleepingThread = null; - } + private void setNotSleeping() { + this.isSleepingUntil = -1; + this.sleepingThread = null; } - private void setSleeping(Thread sleepingThread) { + private void setSleeping(final Thread sleepingThread, final long delay) { this.sleepingThread = sleepingThread; - this.setSleeping(true); + this.isSleepingUntil = System.currentTimeMillis() + delay; } @Override public void resume() { - if ( this.isSleeping ) { + if ( this.isSleepingUntil == -1 ) { final Thread thread = this.sleepingThread; if ( thread != null ) { thread.interrupt(); @@ -158,14 +159,14 @@ public final class OrderedJobQueue exten delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY); } if ( delay > 0 ) { - setSleeping(Thread.currentThread()); + this.setSleeping(Thread.currentThread(), delay); try { this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay); Thread.sleep(delay); } catch (InterruptedException e) { this.ignoreException(e); } finally { - setSleeping(false); + this.setNotSleeping(); } } return info; @@ -180,9 +181,24 @@ public final class OrderedJobQueue exten } @Override - public void removeAll() { + public synchronized void removeAll() { this.jobEvent = null; super.removeAll(); } + + @Override + protected Collection<JobEvent> removeAllJobs() { + final List<JobEvent> events = new ArrayList<JobEvent>(this.queue); + this.queue.clear(); + return events; + } + + @Override + public Object getState(final String key) { + if ( "isSleepingUntil".equals(key) ) { + return this.isSleepingUntil; + } + return super.getState(key); + } } Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Fri Oct 15 06:53:29 2010 @@ -18,6 +18,9 @@ */ package org.apache.sling.event.impl.jobs.queues; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -75,5 +78,12 @@ public final class ParallelJobQueue exte this.queue.clear(); super.clear(); } + + @Override + protected Collection<JobEvent> removeAllJobs() { + final List<JobEvent> events = new ArrayList<JobEvent>(this.queue); + this.queue.clear(); + return events; + } } Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (original) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Fri Oct 15 06:53:29 2010 @@ -19,6 +19,7 @@ package org.apache.sling.event.impl.jobs.queues; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -59,8 +60,8 @@ public final class TopicRoundRobinJobQue } @Override - public String getStatusInfo() { - return super.getStatusInfo() + ", eventCount=" + this.eventCount + ", isWaitingForNext=" + this.isWaitingForNext; + public String getStateInfo() { + return super.getStateInfo() + ", eventCount=" + this.eventCount + ", isWaitingForNext=" + this.isWaitingForNext; } @Override @@ -143,5 +144,19 @@ public final class TopicRoundRobinJobQue } super.clear(); } + + @Override + protected Collection<JobEvent> removeAllJobs() { + final List<JobEvent> events = new ArrayList<JobEvent>(); + synchronized ( this.topicMap ) { + for(final List<JobEvent> l : this.topicMap.values() ) { + events.addAll(l); + } + this.eventCount = 0; + this.topics.clear(); + this.topicMap.clear(); + } + return events; + } } Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java?rev=1022833&r1=1022832&r2=1022833&view=diff ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java (original) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java Fri Oct 15 06:53:29 2010 @@ -36,11 +36,6 @@ public interface Queue { Statistics getStatistics(); /** - * Return some information about the current status of the queue. - */ - String getStatusInfo(); - - /** * Get the corresponding configuration. */ QueueConfiguration getConfiguration(); @@ -83,4 +78,17 @@ public interface Queue { * all outstanding jobs (but no notifications are send). */ void removeAll(); + + /** + * Return some information about the current state of the queue. This + * method is meant to see the internal state of the queue for debugging + * or monitoring purposes. + */ + String getStateInfo(); + + /** + * For monitoring purposes and possible extensions from the different + * queue types. This method allows to query state information. + */ + Object getState(final String key); } Modified: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java?rev=1022833&r1=1022832&r2=1022833&view=diff ============================================================================== --- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java (original) +++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java Fri Oct 15 06:53:29 2010 @@ -77,7 +77,7 @@ public class InternalQueueConfigurationT assertFalse(c.match(getJobEvent("t/x"))); } - @org.junit.Test public void testTopicMatchersStart() { + @org.junit.Test public void testTopicMatchersStar() { final Map<String, Object> p = new HashMap<String, Object>(); p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a*"}); p.put(ConfigurationConstants.PROP_NAME, "test"); @@ -122,6 +122,51 @@ public class InternalQueueConfigurationT assertEquals("test-queue-d", d.queueName); } + @org.junit.Test public void testTopicMatchersDotAndSlash() { + final Map<String, Object> p = new HashMap<String, Object>(); + p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/."}); + p.put(ConfigurationConstants.PROP_NAME, "test"); + + InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p); + assertTrue(c.isValid()); + assertTrue(c.match(getJobEvent("a/b"))); + assertTrue(c.match(getJobEvent("a/c"))); + assertFalse(c.match(getJobEvent("a"))); + assertFalse(c.match(getJobEvent("a/b/c"))); + assertFalse(c.match(getJobEvent("t"))); + assertFalse(c.match(getJobEvent("t/x"))); + } + + @org.junit.Test public void testTopicMatchersStarAndSlash() { + final Map<String, Object> p = new HashMap<String, Object>(); + p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/*"}); + p.put(ConfigurationConstants.PROP_NAME, "test"); + + InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p); + assertTrue(c.isValid()); + assertTrue(c.match(getJobEvent("a/b"))); + assertTrue(c.match(getJobEvent("a/c"))); + assertFalse(c.match(getJobEvent("a"))); + assertTrue(c.match(getJobEvent("a/b/c"))); + assertFalse(c.match(getJobEvent("t"))); + assertFalse(c.match(getJobEvent("t/x"))); + } + + @org.junit.Test public void testTopicMatcherAndReplacementAndSlash() { + final Map<String, Object> p = new HashMap<String, Object>(); + p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/."}); + p.put(ConfigurationConstants.PROP_NAME, "test-queue-{0}"); + + InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p); + assertTrue(c.isValid()); + final JobEvent b = getJobEvent("a/b"); + assertTrue(c.match(b)); + assertEquals("test-queue-b", b.queueName); + final JobEvent d = getJobEvent("a/d"); + assertTrue(c.match(d)); + assertEquals("test-queue-d", d.queueName); + } + @org.junit.Test public void testNoTopicMatchers() { final Map<String, Object> p = new HashMap<String, Object>(); p.put(ConfigurationConstants.PROP_NAME, "test");