Author: cziegeler
Date: Mon Apr 6 15:15:42 2009
New Revision: 762385
URL: http://svn.apache.org/viewvc?rev=762385&view=rev
Log:
Add new configuration property to handle maximum number of parallel processes
for the main queue. Optimize acknowledge handling.
Modified:
incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Modified:
incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL:
http://svn.apache.org/viewvc/incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=762385&r1=762384&r2=762385&view=diff
==============================================================================
---
incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++
incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Mon Apr 6 15:15:42 2009
@@ -98,6 +98,12 @@
/** Default number of seconds to wait for an ack. */
private static final long DEFAULT_WAIT_FOR_ACK = 90; // by default we wait
90 secs
+ /** @scr.property valueRef="DEFAULT_MAXIMUM_PARALLEL_JOBS" */
+ private static final String CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS =
"max.parallel.jobs";
+
+ /** Default nubmer of parallel jobs. */
+ private static final long DEFAULT_MAXIMUM_PARALLEL_JOBS = 15;
+
/** @scr.property valueRef="DEFAULT_WAIT_FOR_ACK" */
private static final String CONFIG_PROPERTY_WAIT_FOR_ACK = "wait.for.ack";
@@ -110,6 +116,9 @@
/** How long do we wait for an ack (in ms) */
private long waitForAckMs;
+ /** Maximum parallel running jobs for a single queue. */
+ private long maximumParallelJobs;
+
/** Background session. */
private Session backgroundSession;
@@ -145,6 +154,9 @@
/** Sync lock */
private final Object backgroundLock = new Object();
+ /** Number of parallel jobs for the main queue. */
+ private long parallelJobCount;
+
/**
* Activate this component.
* @param context
@@ -158,6 +170,7 @@
this.sleepTime =
OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME);
this.maxJobRetries =
OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES),
DEFAULT_MAX_JOB_RETRIES);
this.waitForAckMs =
OsgiUtil.toLong(props.get(CONFIG_PROPERTY_WAIT_FOR_ACK), DEFAULT_WAIT_FOR_ACK)
* 1000;
+ this.maximumParallelJobs =
OsgiUtil.toLong(props.get(CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS),
DEFAULT_MAXIMUM_PARALLEL_JOBS);
this.componentContext = context;
super.activate(context);
JOB_THREAD_POOL = this.threadPool;
@@ -240,17 +253,9 @@
final Map.Entry<String, StartedJobInfo> entry = i.next();
if ( entry.getValue().started <= tooOld ) {
restartJobs.add(entry.getValue());
- i.remove();
}
}
}
- final Iterator<StartedJobInfo> jobIter = restartJobs.iterator();
- while ( jobIter.hasNext() ) {
- final StartedJobInfo info = jobIter.next();
- this.logger.info("No acknowledge received for job {} stored at
{}. Requeueing job.", info.event, info.nodePath);
- this.finishedJob(info.event, info.nodePath, true);
- }
-
// remove obsolete jobs from the repository
if ( this.cleanupPeriod > 0 ) {
this.logger.debug("Cleaning up repository, removing all
finished jobs older than {} minutes.", this.cleanupPeriod);
@@ -282,6 +287,29 @@
}
}
}
+ // restart jobs is now a list of potential candidates, we now have
to check
+ // each candidate separately again!
+ if ( restartJobs.size() > 0 ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // we just ignore this
+ e.printStackTrace();
+ }
+ }
+ final Iterator<StartedJobInfo> jobIter = restartJobs.iterator();
+ while ( jobIter.hasNext() ) {
+ final StartedJobInfo info = jobIter.next();
+ boolean process = false;
+ synchronized ( this.processingEventsList ) {
+ process = this.processingEventsList.remove(info.nodePath)
!= null;
+ }
+ if ( process ) {
+ this.logger.info("No acknowledge received for job {}
stored at {}. Requeueing job.", info.event, info.nodePath);
+ this.finishedJob(info.event, info.nodePath, true);
+ }
+ }
+
// check for idle threads
synchronized ( this.jobQueues ) {
final Iterator<Map.Entry<String, JobBlockingQueue>> i =
this.jobQueues.entrySet().iterator();
@@ -600,6 +628,7 @@
*/
private boolean executeJob(final EventInfo info, final
BlockingQueue<EventInfo> jobQueue) {
boolean putback = false;
+ boolean wait = false;
synchronized (this.backgroundLock) {
try {
this.backgroundSession.refresh(false);
@@ -624,6 +653,12 @@
}
}
+ } else {
+ // check number of parallel jobs for main queue
+ if ( jobQueue == null && this.parallelJobCount >=
this.maximumParallelJobs ) {
+ process = false;
+ wait = true;
+ }
}
if ( process ) {
boolean unlock = true;
@@ -639,7 +674,7 @@
}
if ( process ) {
unlock = false;
- this.processJob(info.event, eventNode);
+ this.processJob(info.event, eventNode,
jobQueue == null);
return true;
}
}
@@ -671,6 +706,17 @@
}
}
+ // if this is the main queue and we have reached the max number of
parallel jobs
+ // we wait a little bit before continuing
+ if ( wait ) {
+ try {
+ Thread.sleep(sleepTime * 1000);
+ } catch (InterruptedException ie) {
+ // ignore
+ ignoreException(ie);
+ }
+ }
+ // if we have to put back the job, we do it now
if ( putback ) {
final EventInfo eInfo = info;
final Date fireDate = new Date();
@@ -823,13 +869,17 @@
* Process a job and unlock the node in the repository.
* @param event The original event.
* @param eventNode The node in the repository where the job is stored.
+ * @param isMainQueue Is this the main queue?
*/
- private void processJob(Event event, Node eventNode) {
+ private void processJob(Event event, Node eventNode, boolean isMainQueue)
{
final boolean parallelProcessing =
event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
||
event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
final String jobTopic =
(String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
boolean unlock = true;
try {
+ if ( isMainQueue && !parallelProcessing ) {
+ this.parallelJobCount++;
+ }
final String nodePath = eventNode.getPath();
final Event jobEvent = this.getJobEvent(event, nodePath);
eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR,
this.applicationId);
@@ -856,6 +906,9 @@
this.logger.error("Exception during job processing.", re);
} finally {
if ( unlock ) {
+ if ( isMainQueue && !parallelProcessing ) {
+ this.parallelJobCount--;
+ }
if ( !parallelProcessing ) {
synchronized ( this.processingMap ) {
this.processingMap.put(jobTopic, Boolean.FALSE);
@@ -1130,6 +1183,10 @@
synchronized ( this.processingMap ) {
this.processingMap.put(jobTopic, Boolean.FALSE);
}
+ } else {
+ if (
job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) == null ) {
+ this.parallelJobCount--;
+ }
}
if ( unlock ) {
synchronized ( this.deletedJobs ) {
Modified:
incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL:
http://svn.apache.org/viewvc/incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=762385&r1=762384&r2=762385&view=diff
==============================================================================
---
incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
(original)
+++
incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Mon Apr 6 15:15:42 2009
@@ -70,6 +70,11 @@
receive such a message in the configured time, it reschedules the job. The
configured \
time is in seconds (default is 90 secs).
+max.parallel.jobs.name = Maximum Parallel Jobs
+max.parallel.jobs.description = The maximum number of parallel jobs started
for the main \
+ queue.
+
+
#
# Event Pool
event.pool.name = Apache Sling Event Thread Pool