Author: cziegeler Date: Fri Feb 19 19:11:26 2010 New Revision: 911940 URL: http://svn.apache.org/viewvc?rev=911940&view=rev Log: SLING-1397 : Add an acknowledge method to EventUtil
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java (with props) Modified: sling/trunk/bundles/extensions/event/pom.xml sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java Modified: sling/trunk/bundles/extensions/event/pom.xml URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=911940&r1=911939&r2=911940&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/pom.xml (original) +++ sling/trunk/bundles/extensions/event/pom.xml Fri Feb 19 19:11:26 2010 @@ -60,7 +60,7 @@ javax.jcr.*;version=1.0,* </Import-Package> <Export-Package> - org.apache.sling.event;version=2.2.0 + org.apache.sling.event;version=2.3.0 </Export-Package> <Private-Package> org.apache.sling.event.impl, Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=911940&r1=911939&r2=911940&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java Fri Feb 19 19:11:26 2010 @@ -259,8 +259,30 @@ } /** + * Send an acknowledge. + * This signals the job handler that someone is starting to process the job. This method + * should be invoked as a first command during job processing. + * If this method returns <code>false</code> this means someone else is already + * processing this job, and the caller should not process the event anymore. + * @return Returns <code>true</code> if the acknowledge could be sent + * @since 2.3 + */ + public static boolean acknowledgeJob(Event job) { + final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job); + if ( ctx != null ) { + if ( !ctx.notifier.sendAcknowledge(job, ctx.eventNodePath) ) { + // if we don't get an ack, someone else is already processing this job. + // we process but do not notify the job event handler. + LoggerFactory.getLogger(EventUtil.class).info("Someone else is already processing job {}.", job); + return false; + } + return true; + } + return false; + } + + /** * Notify a finished job. - * @throws IllegalArgumentException If the event is a job event but does not have a notifier context. */ public static void finishedJob(Event job) { final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job); @@ -272,7 +294,6 @@ /** * Notify a failed job. * @return <code>true</code> if the job has been rescheduled, <code>false</code> otherwise. - * @throws IllegalArgumentException If the event is a job event but does not have a notifier context. */ public static boolean rescheduleJob(Event job) { final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job); Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=911940&r1=911939&r2=911940&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Fri Feb 19 19:11:26 2010 @@ -164,7 +164,7 @@ /** The scheduler for rescheduling jobs. * @scr.reference */ - private Scheduler scheduler; + protected Scheduler scheduler; /** Our component context. */ private ComponentContext componentContext; Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=911940&r1=911939&r2=911940&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java Fri Feb 19 19:11:26 2010 @@ -229,10 +229,10 @@ final Date fireDate = new Date(); fireDate.setTime(System.currentTimeMillis() + delay); - final String schedulerJobName = "Waiting:" + queueName; + final String jobName = "Waiting:" + queueName; final Runnable t = new Runnable() { public void run() { - setSleeping(schedulerJobName); + setSleeping(jobName); try { put(info); } catch (InterruptedException e) { @@ -245,7 +245,7 @@ }; if ( scheduler != null ) { try { - scheduler.fireJobAt(schedulerJobName, t, null, fireDate); + scheduler.fireJobAt(jobName, t, null, fireDate); } catch (Exception e) { // we ignore the exception and just put back the job in the queue ignoreException(e); Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java?rev=911940&r1=911939&r2=911940&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java (original) +++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java Fri Feb 19 19:11:26 2010 @@ -24,6 +24,7 @@ import java.util.Calendar; import java.util.Dictionary; +import java.util.Hashtable; import java.util.Properties; import javax.jcr.PropertyType; @@ -56,6 +57,15 @@ assertTrue(EventUtil.shouldDistribute(distributableEvent)); final Event nonDistributableEvent = new Event("another/topic", (Dictionary<String, Object>)null); assertFalse(EventUtil.shouldDistribute(nonDistributableEvent)); + final Dictionary<String, Object> props = new Hashtable<String, Object>(); + props.put("a", "a"); + props.put("b", "b"); + final Event distributableEvent2 = EventUtil.createDistributableEvent("some/topic", props); + assertTrue(EventUtil.shouldDistribute(distributableEvent2)); + // we should have four properties: 2 custom, one for the dist flag and the fourth for the topic + assertEquals(4, distributableEvent2.getPropertyNames().length); + assertEquals("a", distributableEvent2.getProperty("a")); + assertEquals("b", distributableEvent2.getProperty("b")); } @Test public void testLocalFlag() { Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java?rev=911940&r1=911939&r2=911940&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java (original) +++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java Fri Feb 19 19:11:26 2010 @@ -23,8 +23,11 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Collections; import java.util.Dictionary; import java.util.Hashtable; +import java.util.List; import javax.jcr.RepositoryException; import javax.jcr.observation.EventListenerIterator; @@ -45,6 +48,7 @@ public JobEventHandlerTest() { this.handler = new JobEventHandler(); this.context = new JUnit4Mockery(); + ((JobEventHandler)this.handler).scheduler = new SimpleScheduler(); } @Override @@ -72,11 +76,20 @@ /** * Helper method to create a job event. */ - private Event getJobEvent() { + private Event getJobEvent(String queueName, String id, String parallel) { final Dictionary<String, Object> props = new Hashtable<String, Object>(); props.put(EventUtil.PROPERTY_JOB_TOPIC, "sling/test"); + if ( id != null ) { + props.put(EventUtil.PROPERTY_JOB_ID, id); + } props.put(EventUtil.PROPERTY_JOB_RETRY_DELAY, 2000L); props.put(EventUtil.PROPERTY_JOB_RETRIES, 2); + if ( queueName != null ) { + props.put(EventUtil.PROPERTY_JOB_QUEUE_NAME, queueName); + } + if ( parallel != null ) { + props.put(EventUtil.PROPERTY_JOB_PARALLEL, parallel); + } return new Event(EventUtil.TOPIC_JOB, props); } @@ -86,18 +99,43 @@ */ @org.junit.Test public void testSimpleJobExecution() throws Exception { final JobEventHandler jeh = (JobEventHandler)this.handler; - jeh.handleEvent(getJobEvent()); final Barrier cb = new Barrier(2); jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"}, new EventHandler[] { new EventHandler() { public void handleEvent(Event event) { + EventUtil.acknowledgeJob(event); + EventUtil.finishedJob(event); + cb.block(); + } + + } + }); + jeh.handleEvent(getJobEvent(null, null, null)); + assertTrue("No event received in the given time.", cb.block(5)); + cb.reset(); + assertFalse("Unexpected event received in the given time.", cb.block(5)); + } + + /** + * Test simple job execution with job id. + * The job is executed once and finished successfully. + */ + @org.junit.Test public void testSimpleJobWithIdExecution() throws Exception { + final JobEventHandler jeh = (JobEventHandler)this.handler; + final Barrier cb = new Barrier(2); + jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"}, + new EventHandler[] { + new EventHandler() { + public void handleEvent(Event event) { + EventUtil.acknowledgeJob(event); EventUtil.finishedJob(event); cb.block(); } } }); + jeh.handleEvent(getJobEvent(null, "myid", null)); assertTrue("No event received in the given time.", cb.block(5)); cb.reset(); assertFalse("Unexpected event received in the given time.", cb.block(5)); @@ -108,18 +146,29 @@ * The job is rescheduled two times before it fails. */ @org.junit.Test public void testStartJobAndReschedule() throws Exception { + final List<Integer> retryCountList = new ArrayList<Integer>(); final JobEventHandler jeh = (JobEventHandler)this.handler; - jeh.handleEvent(getJobEvent()); final Barrier cb = new Barrier(2); jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"}, new EventHandler[] { new EventHandler() { + int retryCount; public void handleEvent(Event event) { + EventUtil.acknowledgeJob(event); + int retry = 0; + if ( event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) { + retry = (Integer)event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT); + } + if ( retry == retryCount ) { + retryCountList.add(retry); + } + retryCount++; EventUtil.rescheduleJob(event); cb.block(); } } }); + jeh.handleEvent(getJobEvent(null, null, null)); assertTrue("No event received in the given time.", cb.block(5)); cb.reset(); // the job is retried after two seconds, so we wait again @@ -130,6 +179,147 @@ // we have reached the retry so we expect to not get an event cb.reset(); assertFalse("Unexpected event received in the given time.", cb.block(5)); + assertEquals("Unexpected number of retries", 3, retryCountList.size()); } + /** + * Reschedule test. + * The job is rescheduled two times before it fails. + */ + @org.junit.Test public void testStartJobAndRescheduleInJobQueue() throws Exception { + final List<Integer> retryCountList = new ArrayList<Integer>(); + final Barrier cb = new Barrier(2); + final JobEventHandler jeh = (JobEventHandler)this.handler; + jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"}, + new EventHandler[] { + new EventHandler() { + int retryCount; + public void handleEvent(Event event) { + EventUtil.acknowledgeJob(event); + int retry = 0; + if ( event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) { + retry = (Integer)event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT); + } + if ( retry == retryCount ) { + retryCountList.add(retry); + } + retryCount++; + EventUtil.rescheduleJob(event); + cb.block(); + } + } + }); + jeh.handleEvent(getJobEvent("testqueue", null, null)); + assertTrue("No event received in the given time.", cb.block(5)); + cb.reset(); + // the job is retried after two seconds, so we wait again + assertTrue("No event received in the given time.", cb.block(5)); + cb.reset(); + // the job is retried after two seconds, so we wait again + assertTrue("No event received in the given time.", cb.block(5)); + // we have reached the retry so we expect to not get an event + cb.reset(); + assertFalse("Unexpected event received in the given time.", cb.block(5)); + assertEquals("Unexpected number of retries", 3, retryCountList.size()); + } + + /** + * Notifications. + * We send several jobs which are treated different and then see + * how many invocations have been sent. + */ + @org.junit.Test public void testNotifications() throws Exception { + final List<String> cancelled = Collections.synchronizedList(new ArrayList<String>()); + final List<String> failed = Collections.synchronizedList(new ArrayList<String>()); + final List<String> finished = Collections.synchronizedList(new ArrayList<String>()); + final List<String> started = Collections.synchronizedList(new ArrayList<String>()); + final JobEventHandler jeh = (JobEventHandler)this.handler; + jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test", + EventUtil.TOPIC_JOB_CANCELLED, + EventUtil.TOPIC_JOB_FAILED, + EventUtil.TOPIC_JOB_FINISHED, + EventUtil.TOPIC_JOB_STARTED}, + new EventHandler[] { + new EventHandler() { + public void handleEvent(final Event event) { + EventUtil.acknowledgeJob(event); + // events 1 and 4 finish the first time + final String id = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID); + if ( "1".equals(id) || "4".equals(id) ) { + EventUtil.finishedJob(event); + } else + // 5 fails always + if ( "5".equals(id) ) { + EventUtil.rescheduleJob(event); + } + int retry = 0; + if ( event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) { + retry = (Integer)event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT); + } + // 2 fails the first time + if ( "2".equals(id) ) { + if ( retry == 0 ) { + EventUtil.rescheduleJob(event); + } else { + EventUtil.finishedJob(event); + } + } + // 3 fails the first and second time + if ( "3".equals(id) ) { + if ( retry == 0 || retry == 1 ) { + EventUtil.rescheduleJob(event); + } else { + EventUtil.finishedJob(event); + } + } + } + }, + new EventHandler() { + public void handleEvent(final Event event) { + final Event job = (Event) event.getProperty(EventUtil.PROPERTY_NOTIFICATION_JOB); + final String id = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID); + cancelled.add(id); + } + }, + new EventHandler() { + public void handleEvent(final Event event) { + final Event job = (Event) event.getProperty(EventUtil.PROPERTY_NOTIFICATION_JOB); + final String id = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID); + failed.add(id); + } + }, + new EventHandler() { + public void handleEvent(final Event event) { + final Event job = (Event) event.getProperty(EventUtil.PROPERTY_NOTIFICATION_JOB); + final String id = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID); + finished.add(id); + } + }, + new EventHandler() { + public void handleEvent(final Event event) { + final Event job = (Event) event.getProperty(EventUtil.PROPERTY_NOTIFICATION_JOB); + final String id = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID); + started.add(id); + } + } + }); + jeh.handleEvent(getJobEvent(null, "1", "true")); + jeh.handleEvent(getJobEvent(null, "2", "true")); + jeh.handleEvent(getJobEvent(null, "3", "true")); + jeh.handleEvent(getJobEvent(null, "4", "true")); + jeh.handleEvent(getJobEvent(null, "5", "true")); + int count = 0; + final long startTime = System.currentTimeMillis(); + do { + count = finished.size() + cancelled.size(); + // after 25 seconds we cancel the test + if ( System.currentTimeMillis() - startTime > 25000 ) { + throw new Exception("Timeout during notification test."); + } + } while ( count < 5); + assertEquals("Finished count", 4, finished.size()); + assertEquals("Cancelled count", 1, cancelled.size()); + assertEquals("Started count", 10, started.size()); + assertEquals("Failed count", 5, failed.size()); + } } Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java?rev=911940&view=auto ============================================================================== --- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java (added) +++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java Fri Feb 19 19:11:26 2010 @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl; + +import java.io.Serializable; +import java.util.Date; +import java.util.Map; +import java.util.NoSuchElementException; + +import org.apache.sling.commons.scheduler.Scheduler; + +/** + * Simple scheduler implementation for testing. + */ +public class SimpleScheduler implements Scheduler { + + public void addJob(String name, Object job, + Map<String, Serializable> config, String schedulingExpression, + boolean canRunConcurrently) throws Exception { + throw new IllegalArgumentException(); + } + + public void addPeriodicJob(String name, Object job, + Map<String, Serializable> config, long period, + boolean canRunConcurrently) throws Exception { + throw new IllegalAccessException(); + } + + public boolean fireJob(Object job, Map<String, Serializable> config, + int times, long period) { + throw new IllegalArgumentException(); + } + + public void fireJob(Object job, Map<String, Serializable> config) + throws Exception { + throw new IllegalAccessException(); + } + + public boolean fireJobAt(String name, Object job, + Map<String, Serializable> config, Date date, int times, long period) { + throw new IllegalArgumentException(); + } + + public void fireJobAt(String name, final Object job, + Map<String, Serializable> config, final Date date) throws Exception { + new Thread() { + public void run() { + final long sleepTime = date.getTime() - System.currentTimeMillis(); + if ( sleepTime > 0 ) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + // ignore + } + } + ((Runnable)job).run(); + } + }.start(); + } + + public void removeJob(String name) throws NoSuchElementException { + // ignore this + } +} Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java ------------------------------------------------------------------------------ svn:mime-type = text/plain