Author: cziegeler Date: Wed Apr 21 12:44:53 2010 New Revision: 936287 URL: http://svn.apache.org/viewvc?rev=936287&view=rev Log: SLING-1494 : Update to JCR 2 API
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java?rev=936287&r1=936286&r2=936287&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java Wed Apr 21 12:44:53 2010 @@ -38,6 +38,8 @@ public class EventPropertiesMap extends Dictionary<String, Object> implements Map<String, Object>, Serializable { + private static final long serialVersionUID = 835179638502569708L; + private final Map<String, Object> delegatee; /** Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=936287&r1=936286&r2=936287&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Wed Apr 21 12:44:53 2010 @@ -27,13 +27,13 @@ import javax.jcr.RepositoryException; import javax.jcr.Session; import javax.jcr.observation.EventIterator; import javax.jcr.query.Query; +import javax.jcr.query.qom.QueryObjectModelFactory; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.ConfigurationPolicy; import org.apache.felix.scr.annotations.Properties; import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Service; -import org.apache.jackrabbit.util.ISO8601; import org.apache.sling.commons.osgi.OsgiUtil; import org.apache.sling.event.EventUtil; import org.osgi.service.component.ComponentContext; @@ -83,24 +83,26 @@ public class DistributingEventHandler } /** - * Return the query string for the clean up. + * Return the query for the clean up. */ - protected String getCleanUpQueryString() { + protected Query getCleanUpQuery(final Session s) + throws RepositoryException { + final String selectorName = "nodetype"; final Calendar deleteBefore = Calendar.getInstance(); deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod); - final String dateString = ISO8601.format(deleteBefore); - final StringBuilder buffer = new StringBuilder("/jcr:root"); - buffer.append(this.repositoryPath); - buffer.append("//element(*, "); - buffer.append(getEventNodeType()); - buffer.append(")[@"); - buffer.append(EventHelper.NODE_PROPERTY_CREATED); - buffer.append(" < xs:dateTime('"); - buffer.append(dateString); - buffer.append("')]"); + final QueryObjectModelFactory qomf = s.getWorkspace().getQueryManager().getQOMFactory(); - return buffer.toString(); + final Query q = qomf.createQuery( + qomf.selector(getEventNodeType(), selectorName), + qomf.and(qomf.descendantNode(selectorName, this.repositoryPath), + qomf.comparison(qomf.propertyValue(selectorName, EventHelper.NODE_PROPERTY_CREATED), + QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN, + qomf.literal(s.getValueFactory().createValue(deleteBefore)))), + null, + null + ); + return q; } /** @@ -111,14 +113,14 @@ public class DistributingEventHandler if ( this.cleanupPeriod > 0 ) { this.logger.debug("Cleaning up repository, removing all entries older than {} minutes.", this.cleanupPeriod); - final String queryString = this.getCleanUpQueryString(); // we create an own session for concurrency issues Session s = null; try { s = this.createSession(); - final Node parentNode = (Node)s.getItem(this.repositoryPath); - logger.debug("Executing query {}", queryString); - final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH); + final Query q = this.getCleanUpQuery(s); + if ( logger.isDebugEnabled() ) { + logger.debug("Executing query {}", q.getStatement()); + } final NodeIterator iter = q.execute().getNodes(); int count = 0; while ( iter.hasNext() ) { @@ -126,7 +128,7 @@ public class DistributingEventHandler eventNode.remove(); count++; } - parentNode.save(); + s.save(); logger.debug("Removed {} entries from the repository.", count); } catch (RepositoryException e) { Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java?rev=936287&r1=936286&r2=936287&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java Wed Apr 21 12:44:53 2010 @@ -233,7 +233,8 @@ public abstract class EventHelper { } } oos.close(); - node.setProperty(EventHelper.NODE_PROPERTY_PROPERTIES, new ByteArrayInputStream(baos.toByteArray())); + node.setProperty(EventHelper.NODE_PROPERTY_PROPERTIES, + node.getSession().getValueFactory().createBinary(new ByteArrayInputStream(baos.toByteArray()))); } catch (IOException ioe) { throw new RepositoryException("Unable to serialize event " + EventUtil.toString(event), ioe); } @@ -257,7 +258,7 @@ public abstract class EventHelper { // check the properties blob if ( node.hasProperty(EventHelper.NODE_PROPERTY_PROPERTIES) ) { try { - final ObjectInputStream ois = new ObjectInputStream(node.getProperty(EventHelper.NODE_PROPERTY_PROPERTIES).getStream(), + final ObjectInputStream ois = new ObjectInputStream(node.getProperty(EventHelper.NODE_PROPERTY_PROPERTIES).getBinary().getStream(), objectClassLoader); int length = ois.readInt(); for(int i=0;i<length;i++) { Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=936287&r1=936286&r2=936287&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Apr 21 12:44:53 2010 @@ -38,16 +38,20 @@ import javax.jcr.NodeIterator; import javax.jcr.RepositoryException; import javax.jcr.Session; import javax.jcr.Value; +import javax.jcr.ValueFactory; import javax.jcr.observation.EventIterator; import javax.jcr.query.Query; import javax.jcr.query.QueryManager; +import javax.jcr.query.qom.Comparison; +import javax.jcr.query.qom.Constraint; +import javax.jcr.query.qom.Ordering; +import javax.jcr.query.qom.QueryObjectModelFactory; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Properties; import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Service; import org.apache.felix.scr.annotations.Services; -import org.apache.jackrabbit.util.ISO8601; import org.apache.sling.commons.osgi.OsgiUtil; import org.apache.sling.commons.scheduler.Scheduler; import org.apache.sling.commons.threads.ThreadPool; @@ -304,24 +308,26 @@ public class JobEventHandler } /** - * Return the query string for the clean up. + * Return the query for the clean up. */ - private String getCleanUpQueryString() { + private Query getCleanUpQuery(final Session s) + throws RepositoryException { + final String selectorName = "nodetype"; final Calendar deleteBefore = Calendar.getInstance(); deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod); - final String dateString = ISO8601.format(deleteBefore); - final StringBuilder buffer = new StringBuilder("/jcr:root"); - buffer.append(this.repositoryPath); - buffer.append("//element(*, "); - buffer.append(getEventNodeType()); - buffer.append(")[@"); - buffer.append(EventHelper.NODE_PROPERTY_FINISHED); - buffer.append(" < xs:dateTime('"); - buffer.append(dateString); - buffer.append("')]"); + final QueryObjectModelFactory qomf = s.getWorkspace().getQueryManager().getQOMFactory(); - return buffer.toString(); + final Query q = qomf.createQuery( + qomf.selector(getEventNodeType(), selectorName), + qomf.and(qomf.descendantNode(selectorName, this.repositoryPath), + qomf.comparison(qomf.propertyValue(selectorName, EventHelper.NODE_PROPERTY_FINISHED), + QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN, + qomf.literal(s.getValueFactory().createValue(deleteBefore)))), + null, + null + ); + return q; } private void loadJobsInTheBackground() { @@ -375,13 +381,14 @@ public class JobEventHandler if ( this.cleanupPeriod > 0 ) { this.logger.debug("Cleaning up repository, removing all finished jobs older than {} minutes.", this.cleanupPeriod); - final String queryString = this.getCleanUpQueryString(); // we create an own session for concurrency issues Session s = null; try { s = this.createSession(); - logger.debug("Executing query {}", queryString); - final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH); + final Query q = this.getCleanUpQuery(s); + if ( logger.isDebugEnabled() ) { + logger.debug("Executing query {}", q.getStatement()); + } final NodeIterator iter = q.execute().getNodes(); int count = 0; while ( iter.hasNext() ) { @@ -833,7 +840,7 @@ public class JobEventHandler if ( !eventNode.isLocked() ) { // lock node try { - eventNode.lock(false, true); + this.backgroundSession.getWorkspace().getLockManager().lock(info.nodePath, false, true, Long.MAX_VALUE, "JobEventHandler"); } catch (RepositoryException re) { // lock failed which means that the node is locked by someone else, so we don't have to requeue return Status.FAILED; @@ -985,7 +992,7 @@ public class JobEventHandler final String nodePath = eventNode.getPath(); final Event jobEvent = this.getJobEvent(event, nodePath); eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR, this.applicationId); - eventNode.save(); + eventNode.getSession().save(); final EventAdmin localEA = this.eventAdmin; if ( localEA != null ) { final StartedJobInfo jobInfo = new StartedJobInfo(jobEvent, nodePath, System.currentTimeMillis()); @@ -1015,7 +1022,7 @@ public class JobEventHandler // unlock node try { - eventNode.unlock(); + eventNode.getSession().getWorkspace().getLockManager().unlock(eventNode.getPath()); } catch (RepositoryException e) { // if unlock fails, we silently ignore this this.ignoreException(e); @@ -1130,35 +1137,34 @@ public class JobEventHandler logger.debug("Loading from repository since {} and max {}", since, maxLoad); try { final QueryManager qManager = this.backgroundSession.getWorkspace().getQueryManager(); - final StringBuilder buffer = new StringBuilder("/jcr:root"); - buffer.append(this.repositoryPath); - buffer.append("//element(*, "); - buffer.append(this.getEventNodeType()); - buffer.append(") [not(@"); - buffer.append(EventHelper.NODE_PROPERTY_FINISHED); - buffer.append(")"); + final ValueFactory vf = this.backgroundSession.getValueFactory(); + final String selectorName = "nodetype"; + final Calendar startDate = Calendar.getInstance(); + startDate.setTimeInMillis(this.startTime); + + final QueryObjectModelFactory qomf = qManager.getQOMFactory(); + + Constraint constraint = qomf.and( + qomf.descendantNode(selectorName, this.repositoryPath), + qomf.not(qomf.propertyExistence(selectorName, EventHelper.NODE_PROPERTY_FINISHED))); + constraint = qomf.and(constraint, + qomf.comparison(qomf.propertyValue(selectorName, EventHelper.NODE_PROPERTY_CREATED), + QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN, + qomf.literal(vf.createValue(startDate)))); if ( since != -1 ) { final Calendar beforeDate = Calendar.getInstance(); beforeDate.setTimeInMillis(since); - final String dateString = ISO8601.format(beforeDate); - buffer.append(" and @"); - buffer.append(EventHelper.NODE_PROPERTY_CREATED); - buffer.append(" > xs:dateTime('"); - buffer.append(dateString); - buffer.append("')"); - } - final Calendar startDate = Calendar.getInstance(); - startDate.setTimeInMillis(this.startTime); - final String dateString = ISO8601.format(startDate); - buffer.append(" and @"); - buffer.append(EventHelper.NODE_PROPERTY_CREATED); - buffer.append(" < xs:dateTime('"); - buffer.append(dateString); - buffer.append("')"); - buffer.append("] order by @"); - buffer.append(EventHelper.NODE_PROPERTY_CREATED); - buffer.append(" ascending"); - final Query q = qManager.createQuery(buffer.toString(), Query.XPATH); + constraint = qomf.and(constraint, + qomf.comparison(qomf.propertyValue(selectorName, EventHelper.NODE_PROPERTY_CREATED), + QueryObjectModelFactory.JCR_OPERATOR_GREATER_THAN, + qomf.literal(vf.createValue(beforeDate)))); + } + final Query q = qomf.createQuery( + qomf.selector(getEventNodeType(), selectorName), + constraint, + new Ordering[] {qomf.ascending(qomf.propertyValue(selectorName, EventHelper.NODE_PROPERTY_CREATED))}, + null + ); final NodeIterator result = q.execute().getNodes(); long count = 0; while ( result.hasNext() && count < maxLoad ) { @@ -1348,7 +1354,7 @@ public class JobEventHandler } // unlock node try { - eventNode.unlock(); + eventNode.getSession().getWorkspace().getLockManager().unlock(eventNode.getPath()); } catch (RepositoryException e) { // if unlock fails, we silently ignore this this.ignoreException(e); @@ -1492,34 +1498,33 @@ public class JobEventHandler try { s = this.createSession(); final QueryManager qManager = s.getWorkspace().getQueryManager(); - final StringBuilder buffer = new StringBuilder("/jcr:root"); - buffer.append(this.repositoryPath); - if ( topic != null ) { - buffer.append('/'); - buffer.append(topic.replace('/', '.')); - } - buffer.append("//element(*, "); - buffer.append(this.getEventNodeType()); - buffer.append(") [not(@"); - buffer.append(EventHelper.NODE_PROPERTY_FINISHED); - buffer.append(")"); + final String selectorName = "nodetype"; + + final QueryObjectModelFactory qomf = qManager.getQOMFactory(); + + final String path; + if ( topic == null ) { + path = this.repositoryPath; + } else { + path = this.repositoryPath + '/' + topic.replace('/', '.'); + } + Constraint constraint = qomf.and(qomf.descendantNode(selectorName, path), + qomf.not(qomf.propertyExistence(selectorName, EventHelper.NODE_PROPERTY_FINISHED))); + if ( locked != null ) { if ( locked ) { - buffer.append(" and @jcr:lockOwner"); + constraint = qomf.and(constraint, + qomf.propertyExistence(selectorName, "jcr:lockOwner")); } else { - buffer.append(" and not(@jcr:lockOwner)"); + constraint = qomf.and(constraint, + qomf.not(qomf.propertyExistence(selectorName, "jcr:lockOwner"))); } } if ( filterProps != null && filterProps.length > 0 ) { - buffer.append(" and ("); - int index = 0; + Constraint orConstraint = null; for (Map<String,Object> template : filterProps) { - if ( index > 0 ) { - buffer.append(" or "); - } - buffer.append('('); + Constraint comp = null; final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator(); - boolean first = true; while ( i.hasNext() ) { final Map.Entry<String, Object> current = i.next(); // check prop name first @@ -1528,32 +1533,39 @@ public class JobEventHandler // check value final Value value = EventHelper.getNodePropertyValue(s.getValueFactory(), current.getValue()); if ( value != null ) { - if ( first ) { - first = false; - buffer.append('@'); + final Comparison newComp = qomf.comparison(qomf.propertyValue(selectorName, propName), + QueryObjectModelFactory.JCR_OPERATOR_EQUAL_TO, + qomf.literal(value)); + if ( comp == null ) { + comp = newComp; } else { - buffer.append(" and @"); + comp = qomf.and(comp, newComp); } - buffer.append(propName); - buffer.append(" = '"); - buffer.append(current.getValue()); - buffer.append("'"); } } } - buffer.append(')'); - index++; + if ( comp != null ) { + if ( orConstraint == null ) { + orConstraint = comp; + } else { + orConstraint = qomf.or(constraint, comp); + } + } + } + if ( orConstraint != null ) { + constraint = qomf.and(constraint, orConstraint); } - buffer.append(')'); } - buffer.append("]"); - buffer.append(" order by @"); - buffer.append(EventHelper.NODE_PROPERTY_CREATED); - buffer.append(" ascending"); - final String queryString = buffer.toString(); - logger.debug("Executing job query {}.", queryString); + final Query q = qomf.createQuery( + qomf.selector(getEventNodeType(), selectorName), + constraint, + new Ordering[] {qomf.ascending(qomf.propertyValue(selectorName, EventHelper.NODE_PROPERTY_CREATED))}, + null + ); + if ( logger.isDebugEnabled() ) { + logger.debug("Executing job query {}.", q.getStatement()); + } - final Query q = qManager.createQuery(queryString, Query.XPATH); final NodeIterator iter = q.execute().getNodes(); while ( iter.hasNext() ) { final Node eventNode = iter.nextNode(); Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java?rev=936287&r1=936286&r2=936287&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java Wed Apr 21 12:44:53 2010 @@ -44,6 +44,9 @@ import javax.jcr.lock.LockException; import javax.jcr.observation.EventIterator; import javax.jcr.query.Query; import javax.jcr.query.QueryManager; +import javax.jcr.query.qom.Comparison; +import javax.jcr.query.qom.Constraint; +import javax.jcr.query.qom.QueryObjectModelFactory; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Properties; @@ -173,7 +176,7 @@ public class TimedJobHandler // lock node Lock lock = null; try { - lock = eventNode.lock(false, true); + lock = eventNode.getSession().getWorkspace().getLockManager().lock(info.nodePath, false, true, Long.MAX_VALUE, "TimedJobHandler"); } catch (RepositoryException re) { // lock failed which means that the node is locked by someone else, so we don't have to requeue } @@ -238,7 +241,7 @@ public class TimedJobHandler // write event to repository, lock it and schedule the event final Node eventNode = writeEvent(event, nodeName); - lock = eventNode.lock(false, true); + lock = eventNode.getSession().getWorkspace().getLockManager().lock(eventNode.getPath(), false, true, Long.MAX_VALUE, "TimedJobHandler"); } } @@ -246,7 +249,7 @@ public class TimedJobHandler // if something went wrong, we reschedule if ( !this.processEvent(event, scheduleInfo) ) { final String path = lock.getNode().getPath(); - lock.getNode().unlock(); + writerSession.getWorkspace().getLockManager().unlock(path); return path; } } @@ -516,12 +519,16 @@ public class TimedJobHandler protected void loadEvents() { try { final QueryManager qManager = this.writerSession.getWorkspace().getQueryManager(); - final StringBuilder buffer = new StringBuilder("/jcr:root"); - buffer.append(this.repositoryPath); - buffer.append("//element(*, "); - buffer.append(this.getEventNodeType()); - buffer.append(")"); - final Query q = qManager.createQuery(buffer.toString(), Query.XPATH); + final String selectorName = "nodetype"; + + final QueryObjectModelFactory qomf = qManager.getQOMFactory(); + + final Query q = qomf.createQuery( + qomf.selector(getEventNodeType(), selectorName), + qomf.descendantNode(selectorName, this.repositoryPath), + null, + null + ); final NodeIterator result = q.execute().getNodes(); while ( result.hasNext() ) { final Node eventNode = result.nextNode(); @@ -585,6 +592,8 @@ public class TimedJobHandler protected static final class ScheduleInfo implements Serializable { + private static final long serialVersionUID = 8667701700547811142L; + public final String expression; public final Long period; public final Date date; @@ -681,25 +690,22 @@ public class TimedJobHandler try { s = this.createSession(); final QueryManager qManager = s.getWorkspace().getQueryManager(); - final StringBuilder buffer = new StringBuilder("/jcr:root"); - buffer.append(this.repositoryPath); - if ( topic != null ) { - buffer.append('/'); - buffer.append(topic.replace('/', '.')); - } - buffer.append("//element(*, "); - buffer.append(this.getEventNodeType()); - buffer.append(")"); + final String selectorName = "nodetype"; + + final QueryObjectModelFactory qomf = qManager.getQOMFactory(); + + final String path; + if ( topic == null ) { + path = this.repositoryPath; + } else { + path = this.repositoryPath + '/' + topic.replace('/', '.'); + } + Constraint constraint = qomf.descendantNode(selectorName, path); if ( filterProps != null && filterProps.length > 0 ) { - buffer.append(" ["); - int index = 0; + Constraint orConstraint = null; for (Map<String,Object> template : filterProps) { - if ( index > 0 ) { - buffer.append(" or "); - } - buffer.append('('); + Constraint comp = null; final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator(); - boolean first = true; while ( i.hasNext() ) { final Map.Entry<String, Object> current = i.next(); // check prop name first @@ -708,28 +714,39 @@ public class TimedJobHandler // check value final Value value = EventHelper.getNodePropertyValue(s.getValueFactory(), current.getValue()); if ( value != null ) { - if ( first ) { - first = false; - buffer.append('@'); + final Comparison newComp = qomf.comparison(qomf.propertyValue(selectorName, propName), + QueryObjectModelFactory.JCR_OPERATOR_EQUAL_TO, + qomf.literal(value)); + if ( comp == null ) { + comp = newComp; } else { - buffer.append(" and @"); + comp = qomf.and(comp, newComp); } - buffer.append(propName); - buffer.append(" = '"); - buffer.append(current.getValue()); - buffer.append("'"); } } } - buffer.append(')'); - index++; + if ( comp != null ) { + if ( orConstraint == null ) { + orConstraint = comp; + } else { + orConstraint = qomf.or(orConstraint, comp); + } + } + } + if ( orConstraint != null ) { + constraint = qomf.and(constraint, orConstraint); } - buffer.append(']'); } - final String queryString = buffer.toString(); - logger.debug("Executing job query {}.", queryString); + final Query q = qomf.createQuery( + qomf.selector(getEventNodeType(), selectorName), + constraint, + null, + null + ); + if ( logger.isDebugEnabled() ) { + logger.debug("Executing job query {}.", q.getStatement()); + } - final Query q = qManager.createQuery(queryString, Query.XPATH); final NodeIterator iter = q.execute().getNodes(); while ( iter.hasNext() ) { final Node eventNode = iter.nextNode(); Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=936287&r1=936286&r2=936287&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java Wed Apr 21 12:44:53 2010 @@ -33,6 +33,8 @@ import org.slf4j.Logger; */ public final class JobBlockingQueue extends LinkedBlockingQueue<EventInfo> { + private static final long serialVersionUID = -1874643704782461425L; + private volatile EventInfo eventInfo; private final Object lock = new Object(); Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java?rev=936287&r1=936286&r2=936287&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java (original) +++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java Wed Apr 21 12:44:53 2010 @@ -24,6 +24,7 @@ import static org.junit.Assert.assertNot import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.Calendar; import java.util.Collections; import java.util.Dictionary; import java.util.Hashtable; @@ -56,6 +57,13 @@ public class JobEventHandlerTest extends return this.context; } + @Override + protected Dictionary<String, Object> getComponentConfig() { + final Dictionary<String, Object> config = super.getComponentConfig(); + config.put("cleanup.period", 1); // set clean up to 1 minute + return config; + } + /** * Simple setup test which checks if the session and the session listener * is registered. @@ -360,4 +368,42 @@ public class JobEventHandlerTest extends assertEquals("Started count", 10, started.size()); assertEquals("Failed count", 5, failed.size()); } + + @org.junit.Test public void testCleanup() throws Exception { + final Calendar obsolete = Calendar.getInstance(); + obsolete.add(Calendar.MINUTE, -10); + handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "1").setProperty(EventHelper.NODE_PROPERTY_FINISHED, obsolete); + handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "2").setProperty(EventHelper.NODE_PROPERTY_FINISHED, obsolete); + handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "3").setProperty(EventHelper.NODE_PROPERTY_FINISHED, obsolete); + handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "4").setProperty(EventHelper.NODE_PROPERTY_FINISHED, obsolete); + + final Calendar future = Calendar.getInstance(); + future.add(Calendar.MINUTE, +10); + handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "5").setProperty(EventHelper.NODE_PROPERTY_FINISHED, future); + handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "6").setProperty(EventHelper.NODE_PROPERTY_FINISHED, future); + handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "7"); + handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "8"); + + handler.writerSession.save(); + assertTrue(handler.getWriterRootNode().hasNode("1")); + assertEquals(obsolete, handler.getWriterRootNode().getNode("1").getProperty(EventHelper.NODE_PROPERTY_FINISHED).getDate()); + assertTrue(handler.getWriterRootNode().hasNode("2")); + assertTrue(handler.getWriterRootNode().hasNode("3")); + assertTrue(handler.getWriterRootNode().hasNode("4")); + assertTrue(handler.getWriterRootNode().hasNode("5")); + assertTrue(handler.getWriterRootNode().hasNode("6")); + assertTrue(handler.getWriterRootNode().hasNode("7")); + assertTrue(handler.getWriterRootNode().hasNode("8")); + + ((JobEventHandler)handler).run(); + + assertFalse(handler.getWriterRootNode().hasNode("1")); + assertFalse(handler.getWriterRootNode().hasNode("2")); + assertFalse(handler.getWriterRootNode().hasNode("3")); + assertFalse(handler.getWriterRootNode().hasNode("4")); + assertTrue(handler.getWriterRootNode().hasNode("5")); + assertTrue(handler.getWriterRootNode().hasNode("6")); + assertTrue(handler.getWriterRootNode().hasNode("7")); + assertTrue(handler.getWriterRootNode().hasNode("8")); + } }