Author: cziegeler Date: Tue Oct 19 15:44:50 2010 New Revision: 1024285 URL: http://svn.apache.org/viewvc?rev=1024285&view=rev Log: Ignore emma file Add new tests for drop and ignore queue Fix drop queue handling
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java (with props) sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java (with props) Modified: sling/trunk/bundles/extensions/event/ (props changed) sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Propchange: sling/trunk/bundles/extensions/event/ ------------------------------------------------------------------------------ --- svn:ignore (original) +++ svn:ignore Tue Oct 19 15:44:50 2010 @@ -12,3 +12,5 @@ derby.log .classpath .externalToolBuilders maven-eclipse.xml + +coverage.em Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1024285&r1=1024284&r2=1024285&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Tue Oct 19 15:44:50 2010 @@ -510,10 +510,6 @@ public class PersistenceHandler implemen Event event = null; try { event = this.readEvent(eventNode, false); - if ( shouldProcess ) { - this.process(event); - } - } catch (ClassNotFoundException cnfe) { // store path for lazy loading synchronized ( unloadedJobSet ) { @@ -527,6 +523,7 @@ public class PersistenceHandler implemen if ( event == null ) { try { event = this.readEvent(eventNode, true); + shouldProcess = false; } catch (ClassNotFoundException cnfe) { // this can't occur } catch (RepositoryException re) { @@ -535,9 +532,11 @@ public class PersistenceHandler implemen } if ( event != null ) { ((DefaultJobManager)this.jobManager).notifyAddJob(new JCRJobEvent(event, this)); + if ( shouldProcess ) { + this.process(event); + } } return shouldProcess && event != null; - } // if the node is finished, this is usually an unlock event ((DefaultJobManager)this.jobManager).notifyRemoveJob(nodePath.substring(this.repositoryPath.length() + 1)); Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java?rev=1024285&view=auto ============================================================================== --- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java (added) +++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java Tue Oct 19 15:44:50 2010 @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl.jobs; + +import static org.junit.Assert.assertEquals; + +import java.util.Dictionary; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.sling.event.impl.SimpleEventAdmin; +import org.apache.sling.event.impl.jobs.config.ConfigurationConstants; +import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration; +import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager; +import org.apache.sling.event.impl.jobs.jcr.PersistenceHandler; +import org.apache.sling.event.jobs.JobManager; +import org.apache.sling.event.jobs.JobProcessor; +import org.apache.sling.event.jobs.JobUtil; +import org.apache.sling.event.jobs.QueueConfiguration; +import org.jmock.Mockery; +import org.jmock.integration.junit4.JMock; +import org.jmock.integration.junit4.JUnit4Mockery; +import org.junit.runner.RunWith; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventHandler; + +...@runwith(JMock.class) +public class DropQueueTest extends AbstractJobEventHandlerTest { + + private static final String QUEUE_NAME = "orderedtest"; + private static final String TOPIC = "sling/test"; + private static int NUM_JOBS = 10; + + protected Mockery context; + + private InternalQueueConfiguration mainConfiguration; + + public DropQueueTest() { + this.context = new JUnit4Mockery(); + } + + @Override + protected Mockery getMockery() { + return this.context; + } + + @Override + protected Hashtable<String, Object> getComponentConfig() { + final Hashtable<String, Object> config = super.getComponentConfig(); + config.put("cleanup.period", 1); // set clean up to 1 minute + config.put("load.delay", 1); // load delay to 1 sec + return config; + } + + private void createConfiguration(final QueueConfiguration.Type type) { + // create a new dictionary with the missing info and do some sanety puts + final Map<String, Object> queueProps = new HashMap<String, Object>(); + queueProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC); + queueProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME); + queueProps.put(ConfigurationConstants.PROP_TYPE, type); + queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, new Integer(3)); + + this.mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps); + } + + @Override + protected QueueConfigurationManager createQueueConfigManager() { + this.createConfiguration(QueueConfiguration.Type.DROP); + return new QueueConfigurationManager() { + + @Override + public InternalQueueConfiguration[] getConfigurations() { + return new InternalQueueConfiguration[] {mainConfiguration}; + } + }; + } + + /** + * Helper method to create a job event. + */ + private Event getJobEvent() { + final Dictionary<String, Object> props = new Hashtable<String, Object>(); + props.put(JobUtil.PROPERTY_JOB_TOPIC, TOPIC); + return new Event(JobUtil.TOPIC_JOB, props); + } + + @org.junit.Test public void testDroppingQueue() throws Exception { + final PersistenceHandler jeh = this.handler; + + // set new event admin + final AtomicInteger count = new AtomicInteger(0); + setEventAdmin(new SimpleEventAdmin(new String[] {TOPIC }, + new EventHandler[] { + new EventHandler() { + public void handleEvent(final Event event) { + JobUtil.processJob(event, new JobProcessor() { + + public boolean process(Event job) { + count.incrementAndGet(); + return true; + } + }); + } + }})); + // we start "some" jobs: + for(int i = 0; i < NUM_JOBS; i++ ) { + jeh.handleEvent(getJobEvent()); + } + // we wait a little bit + Thread.sleep(400); + // no jobs queued, none processed and no available + assertEquals(0, this.jobManager.getStatistics().getNumberOfQueuedJobs()); + assertEquals(0, count.get()); + assertEquals(0, this.jobManager.queryJobs(JobManager.QueryType.ALL, TOPIC).getSize()); + + // let'see if restarting helps + this.createConfiguration(QueueConfiguration.Type.UNORDERED); + this.jobManager.restart(); + // we wait a little bit + Thread.sleep(400); + // no jobs queued, none processed and no available + assertEquals(0, this.jobManager.getStatistics().getNumberOfQueuedJobs()); + assertEquals(0, count.get()); + assertEquals(0, this.jobManager.queryJobs(JobManager.QueryType.ALL, TOPIC).getSize()); + } +} Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java?rev=1024285&view=auto ============================================================================== --- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java (added) +++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java Tue Oct 19 15:44:50 2010 @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl.jobs; + +import static org.junit.Assert.assertEquals; + +import java.util.Dictionary; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.sling.event.impl.SimpleEventAdmin; +import org.apache.sling.event.impl.jobs.config.ConfigurationConstants; +import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration; +import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager; +import org.apache.sling.event.impl.jobs.jcr.PersistenceHandler; +import org.apache.sling.event.jobs.JobManager; +import org.apache.sling.event.jobs.JobProcessor; +import org.apache.sling.event.jobs.JobUtil; +import org.apache.sling.event.jobs.QueueConfiguration; +import org.jmock.Mockery; +import org.jmock.integration.junit4.JMock; +import org.jmock.integration.junit4.JUnit4Mockery; +import org.junit.runner.RunWith; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventHandler; + +...@runwith(JMock.class) +public class IgnoreQueueTest extends AbstractJobEventHandlerTest { + + private static final String QUEUE_NAME = "orderedtest"; + private static final String TOPIC = "sling/test"; + private static int NUM_JOBS = 10; + + protected Mockery context; + + private InternalQueueConfiguration mainConfiguration; + + public IgnoreQueueTest() { + this.context = new JUnit4Mockery(); + } + + @Override + protected Mockery getMockery() { + return this.context; + } + + @Override + protected Hashtable<String, Object> getComponentConfig() { + final Hashtable<String, Object> config = super.getComponentConfig(); + config.put("cleanup.period", 1); // set clean up to 1 minute + config.put("load.delay", 1); // load delay to 1 sec + return config; + } + + private void createConfiguration(final QueueConfiguration.Type type) { + // create a new dictionary with the missing info and do some sanety puts + final Map<String, Object> queueProps = new HashMap<String, Object>(); + queueProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC); + queueProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME); + queueProps.put(ConfigurationConstants.PROP_TYPE, type); + queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, new Integer(3)); + + this.mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps); + } + + @Override + protected QueueConfigurationManager createQueueConfigManager() { + this.createConfiguration(QueueConfiguration.Type.IGNORE); + return new QueueConfigurationManager() { + + @Override + public InternalQueueConfiguration[] getConfigurations() { + return new InternalQueueConfiguration[] {mainConfiguration}; + } + }; + } + + /** + * Helper method to create a job event. + */ + private Event getJobEvent() { + final Dictionary<String, Object> props = new Hashtable<String, Object>(); + props.put(JobUtil.PROPERTY_JOB_TOPIC, TOPIC); + return new Event(JobUtil.TOPIC_JOB, props); + } + + @org.junit.Test public void testDroppingQueue() throws Exception { + final PersistenceHandler jeh = this.handler; + + // set new event admin + final AtomicInteger count = new AtomicInteger(0); + setEventAdmin(new SimpleEventAdmin(new String[] {TOPIC }, + new EventHandler[] { + new EventHandler() { + public void handleEvent(final Event event) { + JobUtil.processJob(event, new JobProcessor() { + + public boolean process(Event job) { + count.incrementAndGet(); + return true; + } + }); + } + }})); + // we start "some" jobs: + for(int i = 0; i < NUM_JOBS; i++ ) { + jeh.handleEvent(getJobEvent()); + } + // we wait a little bit + Thread.sleep(400); + // no jobs queued, none processed but available + assertEquals(0, this.jobManager.getStatistics().getNumberOfQueuedJobs()); + assertEquals(0, this.jobManager.getStatistics().getNumberOfProcessedJobs()); + assertEquals(0, count.get()); + assertEquals(NUM_JOBS, this.jobManager.queryJobs(JobManager.QueryType.ALL, TOPIC).getSize()); + + // let'see if restarting helps + this.createConfiguration(QueueConfiguration.Type.UNORDERED); + this.jobManager.restart(); + // we wait + while ( count.get() < NUM_JOBS ) { + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + // ignore + } + } + // no jobs queued, but processed and not available + assertEquals(0, this.jobManager.getStatistics().getNumberOfQueuedJobs()); + assertEquals(NUM_JOBS, this.jobManager.getStatistics().getNumberOfProcessedJobs()); + assertEquals(NUM_JOBS, count.get()); + assertEquals(0, this.jobManager.queryJobs(JobManager.QueryType.ALL, TOPIC).getSize()); + } +} Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java ------------------------------------------------------------------------------ svn:mime-type = text/plain