Author: bchapuis
Date: Tue Feb 8 09:10:22 2011
New Revision: 1068297
URL: http://svn.apache.org/viewvc?rev=1068297&view=rev
Log:
Removed the projects which were not tied whith droids-core. Removed some ant
and forest stuffs.
Added:
incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/TaskMasterImpl.java
Removed:
incubator/droids/branch/bchapuis/build-doc.sh
incubator/droids/branch/bchapuis/build.xml
incubator/droids/branch/bchapuis/default.properties
incubator/droids/branch/bchapuis/docs/api/
incubator/droids/branch/bchapuis/docs/broken-links.xml
incubator/droids/branch/bchapuis/docs/changes.dispatcher.css
incubator/droids/branch/bchapuis/docs/changes.html
incubator/droids/branch/bchapuis/docs/changes.pdf
incubator/droids/branch/bchapuis/docs/changes.rss
incubator/droids/branch/bchapuis/docs/contrib.dispatcher.css
incubator/droids/branch/bchapuis/docs/contrib.html
incubator/droids/branch/bchapuis/docs/contrib.pdf
incubator/droids/branch/bchapuis/docs/default.dispatcher.css
incubator/droids/branch/bchapuis/docs/default.html
incubator/droids/branch/bchapuis/docs/default.pdf
incubator/droids/branch/bchapuis/docs/develop/
incubator/droids/branch/bchapuis/docs/develope.dispatcher.css
incubator/droids/branch/bchapuis/docs/develope.html
incubator/droids/branch/bchapuis/docs/develope.pdf
incubator/droids/branch/bchapuis/docs/favicon.ico
incubator/droids/branch/bchapuis/docs/images/
incubator/droids/branch/bchapuis/docs/index.dispatcher.css
incubator/droids/branch/bchapuis/docs/index.html
incubator/droids/branch/bchapuis/docs/index.pdf
incubator/droids/branch/bchapuis/docs/install.dispatcher.css
incubator/droids/branch/bchapuis/docs/install.html
incubator/droids/branch/bchapuis/docs/install.pdf
incubator/droids/branch/bchapuis/docs/linkmap.dispatcher.css
incubator/droids/branch/bchapuis/docs/linkmap.html
incubator/droids/branch/bchapuis/docs/linkmap.pdf
incubator/droids/branch/bchapuis/docs/locationmap.xml
incubator/droids/branch/bchapuis/docs/mail-lists.dispatcher.css
incubator/droids/branch/bchapuis/docs/mail-lists.html
incubator/droids/branch/bchapuis/docs/mail-lists.pdf
incubator/droids/branch/bchapuis/docs/themes/
incubator/droids/branch/bchapuis/docs/todo.dispatcher.css
incubator/droids/branch/bchapuis/docs/todo.html
incubator/droids/branch/bchapuis/docs/todo.pdf
incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/SequentialTaskMaster.java
incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/SimpleTaskQueue.java
incubator/droids/branch/bchapuis/droids-crawler/
incubator/droids/branch/bchapuis/droids-crawler-web/
incubator/droids/branch/bchapuis/forrest.properties
incubator/droids/branch/bchapuis/forrest.properties.xml
incubator/droids/branch/bchapuis/ivy.xml
incubator/droids/branch/bchapuis/tools/
Added:
incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/TaskMasterImpl.java
URL:
http://svn.apache.org/viewvc/incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/TaskMasterImpl.java?rev=1068297&view=auto
==============================================================================
---
incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/TaskMasterImpl.java
(added)
+++
incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/TaskMasterImpl.java
Tue Feb 8 09:10:22 2011
@@ -0,0 +1,380 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.droids.impl;
+
+import java.util.Date;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.droids.api.DelayTimer;
+import org.apache.droids.api.Droid;
+import org.apache.droids.api.Task;
+import org.apache.droids.api.TaskExceptionHandler;
+import org.apache.droids.api.TaskExceptionResult;
+import org.apache.droids.api.TaskMaster;
+import org.apache.droids.api.TaskMaster.ExecutionState;
+import org.apache.droids.api.WorkMonitor;
+import org.apache.droids.api.Worker;
+import org.apache.droids.helper.Loggable;
+
+/**
+ *
+ * @author bchapuis
+ */
+public class TaskMasterImpl<T extends Task> extends Loggable implements
TaskMaster<T>
+{
+
+ private final long TICKLE_TIME = 250L;
+
+ /**
+ * The execution state
+ */
+ protected volatile ExecutionState state = ExecutionState.STOPPED;
+
+ /**
+ * The delay timer
+ */
+ protected DelayTimer delayTimer;
+
+ /**
+ * The start time
+ */
+ protected Date startTime;
+
+ /**
+ * The end time
+ */
+ protected Date endTime;
+
+ /**
+ * The last completed task
+ */
+ protected T lastCompletedTask;
+
+ /**
+ * The completed task counter
+ */
+ protected AtomicLong completedTasks = new AtomicLong();
+
+ /**
+ * The monitor that that records the processing of tasks
+ */
+ protected WorkMonitor<T> monitor;
+
+ /**
+ * The task exception handler
+ */
+ protected TaskExceptionHandler exceptionHandler;
+
+ /*
+ * The pool size
+ */
+ private int poolSize = 1;
+
+ /**
+ * The pool
+ */
+ private TaskExecutorPool pool ;
+
+ @Override
+ public void start(Queue<T> queue, Droid<T> droid)
+ {
+ if (log.isInfoEnabled()) {
+ log.info("Start the executor service.");
+ }
+
+ state = ExecutionState.RUNNING;
+
+ if (pool == null) {
+ this.pool = new TaskExecutorPool();
+ }
+
+ for (int i = 0; i < poolSize; i++) {
+ pool.execute(new TaskExecutor(droid));
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ @Override
+ public void stop()
+ {
+ // debug
+ if (log.isInfoEnabled()) {
+ log.info("Stop the executor service.");
+ }
+
+ state = ExecutionState.STOPPED;
+
+ // Disable new tasks from being submitted
+ pool.shutdown();
+
+ // Wait a while for existing tasks to terminate
+ try {
+ if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
+
+ // Cancel currently executing tasks
+ pool.shutdownNow();
+
+ // Wait a while for to respond to being canceled
+ if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
+ if (log.isInfoEnabled()) {
+ log.info("Scheduler did not stop.");
+ }
+ }
+ }
+ } catch (InterruptedException ex) {
+
+ if (log.isInfoEnabled()) {
+ log.info("Force scheduler to stop.");
+ }
+
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+
+ // debug
+ if (log.isInfoEnabled()) {
+ log.info("Scheduler stopped.");
+ }
+
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException
+ {
+ return pool.awaitTermination(timeout, unit);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ @Override
+ public ExecutionState getExecutionState()
+ {
+ return state;
+ }
+
+ /**
+ * @return
+ * @inheritDoc
+ */
+ public WorkMonitor<T> getMonitor()
+ {
+ return monitor;
+ }
+
+ /**
+ * @param monitor
+ * @inheritDoc
+ */
+ public void setMonitor(WorkMonitor<T> monitor)
+ {
+ if (state == ExecutionState.RUNNING) {
+ throw new IllegalStateException("The TaskMaster must be stopped to set a
Monitor.");
+ }
+ this.monitor = monitor;
+ }
+
+ @Override
+ public void setExceptionHandler(TaskExceptionHandler exceptionHandler)
+ {
+ this.exceptionHandler = exceptionHandler;
+ }
+
+ @Override
+ public void setDelayTimer(DelayTimer delayTimer)
+ {
+ this.delayTimer = delayTimer;
+ }
+
+ @Override
+ public Date getFinishedWorking()
+ {
+ return endTime;
+ }
+
+ @Override
+ public T getLastCompletedTask()
+ {
+ return lastCompletedTask;
+ }
+
+ @Override
+ public long getCompletedTasks()
+ {
+ return completedTasks.get();
+ }
+
+ @Override
+ public Date getStartTime()
+ {
+ return startTime;
+ }
+
+ /**
+ * Sets the pool size
+ *
+ * @return
+ */
+ public int getPoolSize()
+ {
+ return poolSize;
+ }
+
+ /**
+ * Returns the size of the pool
+ *
+ * @param poolSize
+ */
+ public void setPoolSize(int poolSize)
+ {
+ pool.setCorePoolSize(this.poolSize = poolSize);
+ }
+
+ private class TaskExecutorPool extends ThreadPoolExecutor
+ {
+
+ private static final long KEEP_ALIVE = 50000L;
+
+ public TaskExecutorPool()
+ {
+ super(poolSize, poolSize, KEEP_ALIVE, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<Runnable>());
+ this.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable thrwbl)
+ {
+ super.afterExecute(r, thrwbl);
+
+ // try to reexecute the task runner while
+ // the task queue is not empty and while the pool
+ // is still completing the execution of tasks.
+ TaskExecutor taskExecutor = (TaskExecutor) r;
+ while (taskExecutor.getQueue().size() > 0 || getQueue().size() > 0) {
+ if (taskExecutor.getQueue().size() > 0) {
+ execute(r);
+ return;
+ }
+ try {
+ Thread.sleep(TICKLE_TIME);
+ } catch (InterruptedException e) {
+ log.error(e);
+ }
+ }
+
+ taskExecutor.getDroid().finished();
+ state = ExecutionState.COMPLETED;
+ shutdownNow();
+
+ }
+ }
+
+ private class TaskExecutor implements Runnable
+ {
+
+ private final Droid<T> droid;
+ private final Queue<T> queue;
+ private final Worker<T> worker;
+
+ public TaskExecutor(Droid<T> droid)
+ {
+ this.droid = droid;
+ this.queue = droid.getQueue();
+ this.worker = droid.getNewWorker();
+ }
+
+ public Droid<T> getDroid()
+ {
+ return droid;
+ }
+
+ public Queue<T> getQueue()
+ {
+ return queue;
+ }
+
+ public Worker getWorker()
+ {
+ return worker;
+ }
+
+ @Override
+ public void run()
+ {
+ // poll the last task
+ T task = queue.poll();
+
+ // execute the task
+ if (task != null) {
+ try {
+ // monitor the execution of the task
+ if (monitor != null) {
+ monitor.beforeExecute(task, worker);
+ }
+
+ // debug
+ if (log.isDebugEnabled()) {
+ log.debug("Worker [" + worker + "] execute task [" + task + "].");
+ }
+
+ // execute the task
+ worker.execute(task);
+
+ // debug
+ if (log.isDebugEnabled()) {
+ log.debug("Worker [" + worker + "] executed task [" + task + "]
with success.");
+ }
+
+ // monitor the execution of the task
+ if (monitor != null) {
+ monitor.afterExecute(task, worker, null);
+ }
+
+ // set the monitored variables
+ completedTasks.incrementAndGet();
+ lastCompletedTask = task;
+
+ } catch (Exception ex) {
+ // debug
+ if (log.isDebugEnabled()) {
+ log.debug("Worker [" + worker + "] executed task [" + task + "]
without success.");
+ }
+
+ // debug
+ if (log.isErrorEnabled()) {
+ log.error(ex);
+ }
+
+ // monitor the exception
+ if (monitor != null) {
+ monitor.afterExecute(task, worker, ex);
+ }
+
+ // handler the exception
+ if (ex != null) {
+ TaskExceptionResult result = exceptionHandler.handleException(ex);
+
+ // stop the execution in case of a fatal exception
+ if (TaskExceptionResult.FATAL.equals(result)) {
+ state = ExecutionState.STOPPED;
+ }
+
+ droid.finished();
+ pool.shutdownNow();
+
+ }
+ }
+ }
+ }
+ }
+}