Author: cziegeler
Date: Wed Apr 24 09:58:34 2013
New Revision: 1471338
URL: http://svn.apache.org/r1471338
Log:
SLING-2829 : Add API for starting a job and service interface for executing a
job
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
- copied, changed from r1470959,
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java
- copied, changed from r1470959,
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobConsumer.java
Removed:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobConsumer.java
Modified:
sling/trunk/bundles/extensions/event/pom.xml
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
Modified: sling/trunk/bundles/extensions/event/pom.xml
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Wed Apr 24 09:58:34 2013
@@ -74,6 +74,7 @@
<Export-Package>
org.apache.sling.event;version=2.4.0,
org.apache.sling.event.jobs;version=1.2.0,
+ org.apache.sling.event.jobs.consumer;version=1.0.0,
org.apache.sling.event.jobs.jmx;version=1.0.0
</Export-Package>
<Private-Package>
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
Wed Apr 24 09:58:34 2013
@@ -40,7 +40,7 @@ import org.apache.sling.commons.osgi.Pro
import org.apache.sling.discovery.PropertyProvider;
import org.apache.sling.event.impl.support.TopicMatcher;
import org.apache.sling.event.impl.support.TopicMatcherHelper;
-import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
Wed Apr 24 09:58:34 2013
@@ -25,8 +25,8 @@ import java.util.Set;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.api.wrappers.ValueMapDecorator;
import org.apache.sling.event.impl.support.ResourceHelper;
-import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobUtil.JobPriority;
+import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.Queue;
/**
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
Wed Apr 24 09:58:34 2013
@@ -66,11 +66,11 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.stats.TopicStatisticsImpl;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
-import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobConsumer;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.JobUtil.JobPriority;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobsIterator;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
Wed Apr 24 09:58:34 2013
@@ -29,8 +29,8 @@ import java.util.TreeMap;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.TopologyView;
import
org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
-import org.apache.sling.event.jobs.JobConsumer;
import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java
Wed Apr 24 09:58:34 2013
@@ -38,9 +38,9 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobConsumer;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
@@ -214,8 +214,8 @@ public class EventAdminBridge
}
@Override
- public boolean process(final Job job) {
- // this is never been called, but we throw anyway!
- throw new UnsupportedOperationException();
+ public JobResult process(final Job job) {
+ // this is never been called, but we return something anyway
+ return JobResult.CANCEL;
}
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Wed Apr 24 09:58:34 2013
@@ -37,10 +37,11 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.stats.StatisticsImpl;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobConsumer;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.Statistics;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.event.jobs.consumer.JobConsumer.JobResult;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -262,40 +263,48 @@ public abstract class AbstractJobQueue
return ack != null;
}
- private boolean handleReschedule(final JobHandler jobEvent, final boolean
shouldReschedule) {
- boolean reschedule = shouldReschedule;
- if ( shouldReschedule ) {
- // check if we exceeded the number of retries
- int retries = (Integer)
jobEvent.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
- int retryCount =
(Integer)jobEvent.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
-
- retryCount++;
- if ( retries != -1 && retryCount > retries ) {
- reschedule = false;
- }
- if ( reschedule ) {
- // update event with retry count and retries
- jobEvent.getJob().retry();
+ private boolean handleReschedule(final JobHandler jobEvent, final
JobConsumer.JobResult result) {
+ boolean reschedule = false;
+ switch ( result ) {
+ case OK : // job is finished
if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Failed job {}",
Utility.toString(jobEvent.getJob()));
+ this.logger.debug("Finished job {}",
Utility.toString(jobEvent.getJob()));
}
- this.failedJob();
- jobEvent.queued = System.currentTimeMillis();
- Utility.sendNotification(this.eventAdmin,
JobUtil.TOPIC_JOB_FAILED, jobEvent.getJob(), null);
- } else {
+ final long processingTime = System.currentTimeMillis() -
jobEvent.started;
+ this.finishedJob(processingTime);
+ Utility.sendNotification(this.eventAdmin,
JobUtil.TOPIC_JOB_FINISHED, jobEvent.getJob(), processingTime);
+ break;
+ case FAILED : // check if we exceeded the number of retries
+ int retries = (Integer)
jobEvent.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
+ int retryCount =
(Integer)jobEvent.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
+
+ retryCount++;
+ if ( retries != -1 && retryCount > retries ) {
+ reschedule = false;
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Cancelled job {}",
Utility.toString(jobEvent.getJob()));
+ }
+ this.cancelledJob();
+ Utility.sendNotification(this.eventAdmin,
JobUtil.TOPIC_JOB_CANCELLED, jobEvent.getJob(), null);
+ } else {
+ reschedule = true;
+ // update event with retry count and retries
+ jobEvent.getJob().retry();
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Failed job {}",
Utility.toString(jobEvent.getJob()));
+ }
+ this.failedJob();
+ jobEvent.queued = System.currentTimeMillis();
+ Utility.sendNotification(this.eventAdmin,
JobUtil.TOPIC_JOB_FAILED, jobEvent.getJob(), null);
+ }
+ break;
+ case CANCEL : // consumer cancelled the job
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Cancelled job {}",
Utility.toString(jobEvent.getJob()));
}
this.cancelledJob();
Utility.sendNotification(this.eventAdmin,
JobUtil.TOPIC_JOB_CANCELLED, jobEvent.getJob(), null);
- }
- } else {
- if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Finished job {}",
Utility.toString(jobEvent.getJob()));
- }
- final long processingTime = System.currentTimeMillis() -
jobEvent.started;
- this.finishedJob(processingTime);
- Utility.sendNotification(this.eventAdmin,
JobUtil.TOPIC_JOB_FINISHED, jobEvent.getJob(), processingTime);
+ break;
}
return reschedule;
@@ -307,18 +316,12 @@ public abstract class AbstractJobQueue
@Override
public boolean finishedJob(final Event job, final boolean
shouldReschedule) {
final String location = (String)job.getProperty(JobUtil.JOB_ID);
- return this.finishedJob(location, shouldReschedule);
+ return this.finishedJob(location, shouldReschedule ? JobResult.FAILED
: JobResult.OK);
}
- private boolean finishedJob(final String jobId, final boolean
shouldReschedule) {
+ private boolean finishedJob(final String jobId, final
JobConsumer.JobResult result) {
if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Received finish for job {},
shouldReschedule={}", jobId, shouldReschedule);
- }
- if ( !this.running ) {
- if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Queue is not running anymore. Discarding
finish for {}", jobId);
- }
- return false;
+ this.logger.debug("Received finish for job {}, result={}", jobId,
result);
}
// let's remove the event from our processing list
// this is just a sanity check, as usually the job should have been
@@ -327,11 +330,19 @@ public abstract class AbstractJobQueue
this.startedJobsLists.remove(jobId);
}
- // get job event
+ // get job handler
final JobHandler info;
synchronized ( this.processsingJobsLists ) {
info = this.processsingJobsLists.remove(jobId);
}
+
+ if ( !this.running ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Queue is not running anymore. Discarding
finish for {}", jobId);
+ }
+ return false;
+ }
+
if ( info == null ) {
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("This job has never been started by this
queue: {}", jobId);
@@ -340,7 +351,7 @@ public abstract class AbstractJobQueue
}
// handle the reschedule, a new job might be returned with updated
reschedule info!
- final boolean reschedule = this.handleReschedule(info,
shouldReschedule);
+ final boolean reschedule = this.handleReschedule(info, result);
// if this is set after the synchronized block we have an error
final boolean finishSuccessful;
@@ -502,22 +513,18 @@ public abstract class AbstractJobQueue
break;
}
}
- boolean result = false;
+ JobConsumer.JobResult result =
JobConsumer.JobResult.CANCEL;
try {
result = consumer.process(info.getJob());
} catch (final Throwable t) { //NOSONAR
logger.error("Unhandled error occured in
job processor " + t.getMessage() + " while processing job " +
Utility.toString(info.getJob()), t);
// we don't reschedule if an exception
occurs
- result = true;
+ result = JobConsumer.JobResult.CANCEL;
} finally {
currentThread.setPriority(oldPriority);
currentThread.setName(oldName);
if ( notifyResult ) {
- if ( result ) {
- finishedJob(info.getJob().getId(),
false);
- } else {
- finishedJob(info.getJob().getId(),
true);
- }
+ finishedJob(info.getJob().getId(),
result);
}
}
}
Copied:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
(from r1470959,
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java)
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java&r1=1470959&r2=1471338&rev=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
Wed Apr 24 09:58:34 2013
@@ -21,6 +21,8 @@ package org.apache.sling.event.jobs;
import java.util.Calendar;
import java.util.Set;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+
/**
* A job
*
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
Wed Apr 24 09:58:34 2013
@@ -18,6 +18,7 @@
*/
package org.apache.sling.event.jobs;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.Event;
/**
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
Wed Apr 24 09:58:34 2013
@@ -23,6 +23,7 @@ import java.util.Calendar;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.Event;
import org.slf4j.LoggerFactory;
Copied:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java
(from r1470959,
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobConsumer.java)
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobConsumer.java&r1=1470959&r2=1471338&rev=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobConsumer.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java
Wed Apr 24 09:58:34 2013
@@ -16,7 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.event.jobs;
+package org.apache.sling.event.jobs.consumer;
+
+import org.apache.sling.event.jobs.Job;
+
/**
@@ -45,6 +48,11 @@ package org.apache.sling.event.jobs;
*/
public interface JobConsumer {
+ enum JobResult {
+ OK,
+ FAILED,
+ CANCEL
+ }
/**
* Service registration property defining the jobs this consumer is able
to process.
* The value is either a string or an array of strings.
@@ -53,12 +61,18 @@ public interface JobConsumer {
/**
* Execute the job.
- * If the job fails with throwing an exception/throwable, the process will
not be rescheduled.
- * However in this case the job will be treated as run successfully.
+ *
+ * If the job has been processed successfully, {@link #JobResult.OK}
should be returned.
+ * If the job has not been processed completely, but might be rescheduled
{@link #JobResult.FAILED}
+ * should be returned.
+ * If the job processing failed and should not be rescheduled, {@link
#JobResult.CANCEL} should
+ * be returned.
+ *
+ * If the processing fails with throwing an exception/throwable, the
process will not be rescheduled
+ * and treated like the method would have returned {@link
#JobResult.CANCEL}.
*
* @param job The job
- * @return True if the job could be finished (either successful or by an
error).
- * Return false if the job should be rescheduled.
+ * @return The job result
*/
- boolean process(Job job);
+ JobResult process(Job job);
}
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
Wed Apr 24 09:58:34 2013
@@ -31,8 +31,8 @@ import java.util.Hashtable;
import javax.inject.Inject;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.jobs.JobConsumer;
import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.apache.sling.launchpad.api.StartupHandler;
import org.apache.sling.launchpad.api.StartupMode;
import org.ops4j.pax.exam.CoreOptions;
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
Wed Apr 24 09:58:34 2013
@@ -35,10 +35,10 @@ import java.util.concurrent.atomic.Atomi
import org.apache.sling.event.EventPropertiesMap;
import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobConsumer;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.ExamReactorStrategy;
@@ -83,9 +83,9 @@ public class ClassloadingTest extends Ab
new JobConsumer() {
@Override
- public boolean process(Job job) {
+ public JobResult process(Job job) {
count.incrementAndGet();
- return true;
+ return JobResult.OK;
}
});
final ServiceRegistration ehReg =
this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
@@ -147,9 +147,9 @@ public class ClassloadingTest extends Ab
new JobConsumer() {
@Override
- public boolean process(Job job) {
+ public JobResult process(Job job) {
count.incrementAndGet();
- return true;
+ return JobResult.OK;
}
});
final ServiceRegistration ehReg =
this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
Wed Apr 24 09:58:34 2013
@@ -27,10 +27,10 @@ import java.util.concurrent.atomic.Atomi
import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobConsumer;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.ExamReactorStrategy;
@@ -76,9 +76,9 @@ public class DropQueueTest extends Abstr
new JobConsumer() {
@Override
- public boolean process(Job job) {
+ public JobResult process(Job job) {
count.incrementAndGet();
- return true;
+ return JobResult.OK;
}
});
final ServiceRegistration ehReg =
this.registerEventHandler(JobUtil.TOPIC_JOB_CANCELLED,
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
Wed Apr 24 09:58:34 2013
@@ -27,9 +27,9 @@ import java.util.concurrent.atomic.Atomi
import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobConsumer;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.ExamReactorStrategy;
@@ -71,9 +71,9 @@ public class IgnoreQueueTest extends Abs
new JobConsumer() {
@Override
- public boolean process(Job job) {
+ public JobResult process(Job job) {
count.incrementAndGet();
- return true;
+ return JobResult.OK;
}
});
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
Wed Apr 24 09:58:34 2013
@@ -37,11 +37,11 @@ import javax.inject.Inject;
import org.apache.sling.event.impl.Barrier;
import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobConsumer;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobProcessor;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -181,9 +181,9 @@ public class JobHandlingTest extends Abs
new JobConsumer() {
@Override
- public boolean process(Job job) {
+ public JobResult process(Job job) {
cb.block();
- return true;
+ return JobResult.OK;
}
});
try {
@@ -208,10 +208,10 @@ public class JobHandlingTest extends Abs
new JobConsumer() {
@Override
- public boolean process(Job job) {
+ public JobResult process(Job job) {
cb.block();
cb2.block();
- return false;
+ return JobResult.FAILED;
}
});
try {
@@ -248,10 +248,10 @@ public class JobHandlingTest extends Abs
new JobConsumer() {
@Override
- public boolean process(Job job) {
+ public JobResult process(Job job) {
cb.block();
sleep(1000);
- return false;
+ return JobResult.FAILED;
}
});
try {
@@ -283,7 +283,7 @@ public class JobHandlingTest extends Abs
int retryCount;
@Override
- public boolean process(Job job) {
+ public JobResult process(Job job) {
int retry = 0;
if ( job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT) !=
null ) {
retry =
(Integer)job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
@@ -293,7 +293,7 @@ public class JobHandlingTest extends Abs
}
retryCount++;
cb.block();
- return false;
+ return JobResult.FAILED;
}
});
try {
@@ -330,15 +330,15 @@ public class JobHandlingTest extends Abs
new JobConsumer() {
@Override
- public boolean process(Job job) {
+ public JobResult process(Job job) {
// events 1 and 4 finish the first time
final String id = job.getName();
if ( "1".equals(id) || "4".equals(id) ) {
- return true;
+ return JobResult.OK;
// 5 fails always
} else if ( "5".equals(id) ) {
- return false;
+ return JobResult.FAILED;
} else {
int retry = 0;
if ( job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT)
!= null ) {
@@ -347,21 +347,21 @@ public class JobHandlingTest extends Abs
// 2 fails the first time
if ( "2".equals(id) ) {
if ( retry == 0 ) {
- return false;
+ return JobResult.FAILED;
} else {
- return true;
+ return JobResult.OK;
}
}
// 3 fails the first and second time
if ( "3".equals(id) ) {
if ( retry == 0 || retry == 1 ) {
- return false;
+ return JobResult.FAILED;
} else {
- return true;
+ return JobResult.OK;
}
}
}
- return false;
+ return JobResult.FAILED;
}
});
final ServiceRegistration eh1Reg =
this.registerEventHandler(JobUtil.TOPIC_JOB_CANCELLED,
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
Wed Apr 24 09:58:34 2013
@@ -31,11 +31,11 @@ import java.util.concurrent.atomic.Atomi
import org.apache.sling.event.impl.Barrier;
import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobConsumer;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -80,9 +80,9 @@ public class OrderedQueueTest extends Ab
new JobConsumer() {
@Override
- public boolean process(final Job job) {
+ public JobResult process(final Job job) {
cb.block();
- return true;
+ return JobResult.OK;
}
});
@@ -93,17 +93,17 @@ public class OrderedQueueTest extends Ab
new JobConsumer() {
@Override
- public boolean process(final Job job) {
+ public JobResult process(final Job job) {
if ( parallelCount.incrementAndGet() > 1 ) {
parallelCount.decrementAndGet();
- return false;
+ return JobResult.FAILED;
}
final String topic = job.getTopic();
if ( topic.endsWith("sub1") ) {
final int i =
(Integer)job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
if ( i == 0 ) {
parallelCount.decrementAndGet();
- return false;
+ return JobResult.FAILED;
}
}
try {
@@ -112,7 +112,7 @@ public class OrderedQueueTest extends Ab
// ignore
}
parallelCount.decrementAndGet();
- return true;
+ return JobResult.OK;
}
});
final ServiceRegistration ehReg =
this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java?rev=1471338&r1=1471337&r2=1471338&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
Wed Apr 24 09:58:34 2013
@@ -30,11 +30,11 @@ import java.util.concurrent.atomic.Atomi
import org.apache.sling.event.impl.Barrier;
import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobConsumer;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.ExamReactorStrategy;
@@ -81,9 +81,9 @@ public class RoundRobinQueueTest extends
new JobConsumer() {
@Override
- public boolean process(final Job job) {
+ public JobResult process(final Job job) {
cb.block();
- return true;
+ return JobResult.OK;
}
});
@@ -94,14 +94,14 @@ public class RoundRobinQueueTest extends
new JobConsumer() {
@Override
- public boolean process(final Job job) {
+ public JobResult process(final Job job) {
if ( parallelCount.incrementAndGet() > MAX_PAR ) {
parallelCount.decrementAndGet();
- return false;
+ return JobResult.FAILED;
}
sleep(30);
parallelCount.decrementAndGet();
- return true;
+ return JobResult.OK;
}
});
final ServiceRegistration ehReg =
this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,