Author: tobr
Date: Tue Jan 29 09:50:17 2013
New Revision: 1439804

URL: http://svn.apache.org/viewvc?rev=1439804&view=rev
Log:
defined crawler defaults
added logging 
added link management
updated the API

Added:
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
   (with props)
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
   (with props)
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
   (with props)
Removed:
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/MultiThreadedTaskMaster.java
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/SequentialTaskMaster.java
Modified:
    incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java
    
incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java

Modified: incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml (original)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml Tue Jan 29 
09:50:17 2013
@@ -35,7 +35,7 @@
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>droids-core</artifactId>
-    <name>Apache Droids Core</name>
+    <name>APACHE DROIDS CORE</name>
     <inceptionYear>2007</inceptionYear>
     <description>
         Apache Droids API and core components

Modified: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java
URL: 
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java
 (original)
+++ 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java
 Tue Jan 29 09:50:17 2013
@@ -21,6 +21,7 @@ import org.apache.droids.helper.factorie
 import org.apache.droids.helper.factories.ParserFactory;
 import org.apache.droids.filter.Filter;
 import org.apache.droids.parse.Parser;
+import org.apache.droids.taskmaster.MultiThreadedTaskMaster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +35,7 @@ import java.util.concurrent.TimeUnit;
 public abstract class AbstractDroid<T extends Task> implements Droid<T> {
     protected final Queue<T> queue;
     protected final TaskMaster<T> taskMaster;
-    protected Fetcher fetcher;
+    protected Fetcher<T> fetcher;
     protected ParserFactory parserFactory;
     protected FilterFactory filterFactory;
     protected HandlerFactory handlerFactory;
@@ -42,12 +43,12 @@ public abstract class AbstractDroid<T ex
     protected final static Logger logger = 
LoggerFactory.getLogger(AbstractDroid.class);
 
     public AbstractDroid() {
-        this(new SimpleTaskQueueWithHistory<T>(), new 
MultiThreadedTaskMaster<T>());
+        this(null, null);
     }
 
     public AbstractDroid(Queue<T> queue, TaskMaster<T> taskMaster) {
-        this.queue = queue;
-        this.taskMaster = taskMaster;
+        this.queue = queue == null ? new SimpleTaskQueueWithHistory<T>() : 
queue;
+        this.taskMaster = queue == null ? new MultiThreadedTaskMaster<T>() : 
taskMaster;
         this.parserFactory = new ParserFactory();
         this.filterFactory = new FilterFactory();
         this.handlerFactory = new HandlerFactory();
@@ -71,11 +72,19 @@ public abstract class AbstractDroid<T ex
 
     @Override
     public void add(T task) {
+        logger.debug("add task: " + task.getURI());
         queue.add(task);
     }
 
     @Override
+    public T filter(T task) {
+        logger.debug("filter task: " + task.getURI());
+        return this.filterFactory.filter(task);
+    }
+
+    @Override
     public void load(T task) throws DroidsException, IOException {
+        logger.debug("load task: " + task.getURI());
         if (this.fetcher == null) {
             throw new DroidsException("Fetcher not set");
         } else {
@@ -85,17 +94,20 @@ public abstract class AbstractDroid<T ex
 
     @Override
     public void parse(T task) throws DroidsException, IOException {
+        logger.debug("parse task: " + task.getURI());
         this.parserFactory.parse(task);
     }
 
     @Override
     public void handle(T task) throws DroidsException, IOException {
+        logger.debug("handle task: " + task.getURI());
         this.handlerFactory.handle(task);
     }
 
     @Override
-    public T filter(T task) {
-        return this.filterFactory.filter(task);
+    public void finish(T task) throws DroidsException, IOException {
+        task.getContentEntity().close();
+        logger.debug("finished task: " + task.getURI());
     }
 
     public void setFetcher(Fetcher fetcher) {

Modified: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java
URL: 
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java
 (original)
+++ 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java
 Tue Jan 29 09:50:17 2013
@@ -1,8 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.droids.core;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 /**
  *
@@ -13,7 +31,7 @@ public class ContentEntity {
     private Map<String, Object> data;
 
     public final static String CONTENT = "content";
-    public final static String MIME_TYPE = "mime";
+    public final static String CONTENT_TYPE = "content-type";
     public final static String CONTENT_LENGTH = "content-length";
     public final static String LINKS = "links";
 
@@ -33,15 +51,31 @@ public class ContentEntity {
         this.put(CONTENT, in);
     }
 
+    public <T extends Task> void setLinks(Set<T> links) {
+        this.put(LINKS, links);
+    }
+
     public InputStream getContent() throws DroidsException {
         if (this.getValue(CONTENT) != null) {
             if (this.getValue(CONTENT) instanceof InputStream) {
                 return (InputStream)this.getValue(CONTENT);
             } else {
-                throw new DroidsException("wrong type of content");
+                throw new DroidsException("no content available");
+            }
+        } else {
+            return null;
+        }
+    }
+
+    public <T extends Task> Set<T> getLinks() throws DroidsException {
+        if (this.getValue(LINKS) != null) {
+            if (this.getValue(LINKS) instanceof Set) {
+                return (Set<T>)this.getValue(LINKS);
+            } else {
+                throw new DroidsException("no set of links available");
             }
         } else {
-            throw new NullPointerException();
+            return null;
         }
     }
 
@@ -49,4 +83,10 @@ public class ContentEntity {
         return data;
     }
 
+    public void close() throws DroidsException, IOException {
+        if (this.getContent() != null) {
+            this.getContent().close();
+        }
+    }
+
 }

Modified: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java
URL: 
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java
 (original)
+++ 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java
 Tue Jan 29 09:50:17 2013
@@ -78,6 +78,15 @@ public interface Droid<T extends Task> {
     public void handle(T task) throws DroidsException, IOException;
 
     /**
+     * Finish the task.
+     * Close resources like InputStreams, perform monitoring and clean up the 
task.
+     *
+     * @param task the task to handle
+     */
+    public void finish(T task) throws DroidsException, IOException;
+
+
+    /**
      * Filter the task.
      *
      * @param task the task to filter

Modified: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
URL: 
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
 (original)
+++ 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
 Tue Jan 29 09:50:17 2013
@@ -59,4 +59,6 @@ public interface Task extends Serializab
     public void abort();
 
     public boolean isAborted();
+
+    public Task createTask(URI uri);
 }

Modified: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java
 (original)
+++ 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java
 Tue Jan 29 09:50:17 2013
@@ -23,6 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
+import java.util.HashSet;
 import java.util.Set;
 
 /**
@@ -36,7 +37,7 @@ public class SysoutHandler extends Write
     private static final Logger logger = 
LoggerFactory.getLogger(SysoutHandler.class);
 
     public SysoutHandler() {
-        super();
+        this(new HashSet<String>());
     }
 
     public SysoutHandler(Set<String> attributes) {
@@ -53,7 +54,7 @@ public class SysoutHandler extends Write
     @Override
     public void handle(Task task) throws IOException, DroidsException {
         for (String key : task.getContentEntity().getData().keySet()) {
-            if (attributes.contains(key)) {
+            if (attributes.contains(key) || attributes.isEmpty()) {
                 if (task.getContentEntity().getValue(key) instanceof 
InputStream) {
                     InputStream instream = (InputStream) 
task.getContentEntity().getValue(key);
                     try {

Modified: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java
URL: 
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java
 (original)
+++ 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java
 Tue Jan 29 09:50:17 2013
@@ -26,12 +26,12 @@ import org.apache.droids.core.Task;
  *
  * @version 1.0
  */
-public interface Parser {
+public interface Parser<T extends Task> {
     /**
      * Creates the parse for some content.
      *
      * @param task the task that correspond to the stream
      * @return the parse object
      */
-    public void parse(Task task) throws DroidsException, IOException;
+    public void parse(T task) throws DroidsException, IOException;
 }

Added: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
URL: 
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java?rev=1439804&view=auto
==============================================================================
--- 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
 (added)
+++ 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
 Tue Jan 29 09:50:17 2013
@@ -0,0 +1,36 @@
+package org.apache.droids.parse;
+
+import org.apache.droids.core.DroidsException;
+import org.apache.droids.core.Task;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ *
+ *
+ *
+ */
+public class SimpleLinkParser<T extends Task> implements Parser<T> {
+
+    @Override
+    public void parse(T task) throws DroidsException, IOException {
+        InputStream inStream = task.getContentEntity().getContent();
+        if (inStream != null) {
+            Scanner s = new Scanner(inStream).useDelimiter("\\A");
+            String content = s.hasNext() ? s.next() : "";
+            Pattern linkPattern = 
Pattern.compile("<a[^>]+href=[\"']?([^\"'>]+)[\"']?[^>]*>(.+?)</a>",  
Pattern.CASE_INSENSITIVE|Pattern.DOTALL);
+            Matcher pageMatcher = linkPattern.matcher(content);
+            Set<Task> links = new HashSet<Task>();
+            while(pageMatcher.find()){
+                
links.add(task.createTask(task.getURI().resolve(pageMatcher.group(1))));
+            }
+            task.getContentEntity().setLinks(links);
+        }
+    }
+}

Propchange: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
URL: 
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java?rev=1439804&view=auto
==============================================================================
--- 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
 (added)
+++ 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
 Tue Jan 29 09:50:17 2013
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.taskmaster;
+
+import java.util.Date;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.droids.core.Droid;
+import org.apache.droids.core.Task;
+import org.apache.droids.core.TaskMaster;
+import org.apache.droids.core.Worker;
+import org.apache.droids.delay.DelayTimer;
+import org.apache.droids.exception.TaskExceptionHandler;
+import org.apache.droids.exception.TaskExceptionResult;
+import org.apache.droids.monitor.WorkMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for running all the tasks
+ */
+public class MultiThreadedTaskMaster<T extends Task> implements TaskMaster<T> {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(MultiThreadedTaskMaster.class);
+    private static final long TICKLE_TIME = 1000L;
+
+    /**
+     * The execution state
+     */
+    protected volatile ExecutionState state = ExecutionState.STOPPED;
+    /**
+     * The delay timer
+     */
+    protected DelayTimer delayTimer;
+    /**
+     * The start time
+     */
+    protected Date startTime;
+    /**
+     * The end time
+     */
+    protected Date endTime;
+    /**
+     * The last completed task
+     */
+    protected T lastCompletedTask;
+    /**
+     * The completed task counter
+     */
+    protected AtomicLong completedTasks = new AtomicLong();
+    /**
+     * The monitor that that records the processing of tasks
+     */
+    protected WorkMonitor<T> monitor;
+    /**
+     * The task exception handler
+     */
+    protected TaskExceptionHandler exceptionHandler;
+
+    /*
+     * The pool size
+     */
+    private int poolSize = 1;
+    /**
+     * The pool
+     */
+    private TaskExecutorPool pool;
+
+    @Override
+    public void start(Queue<T> queue, Droid<T> droid) {
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Start the executor service.");
+        }
+
+        state = ExecutionState.RUNNING;
+
+        if (pool == null) {
+            this.pool = new TaskExecutorPool();
+            this.pool.setCorePoolSize(this.poolSize);
+        }
+
+        // Stagger startup
+        for (int i = 0; i < poolSize; i++) {
+            try {
+                Thread.sleep(TICKLE_TIME);
+            } catch (InterruptedException ignored) {
+                LOG.error("", ignored);
+            }
+            pool.execute(new TaskExecutor(queue, droid));
+        }
+    }
+
+    /**
+     * Stops the TaskMaster
+     */
+    public void stop() {
+        // debug
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Stop the executor service.");
+        }
+
+        state = ExecutionState.STOPPED;
+
+        // Disable new tasks from being submitted
+        pool.shutdown();
+
+        // Wait a while for existing tasks to terminate
+        try {
+            if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
+
+                // Cancel currently executing tasks
+                pool.shutdownNow();
+
+                // Wait a while for to respond to being canceled
+                if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
+                    if (LOG.isInfoEnabled()) {
+                        LOG.info("Scheduler did not stop.");
+                    }
+                }
+            }
+        } catch (InterruptedException ex) {
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Force scheduler to stop.");
+            }
+
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+
+        // debug
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Scheduler stopped.");
+        }
+
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+        return pool.awaitTermination(timeout, unit);
+    }
+
+    /**
+     * @inheritDoc
+     */
+    @Override
+    public ExecutionState getExecutionState() {
+        return state;
+    }
+
+    /**
+     * @return
+     * @inheritDoc
+     */
+    public WorkMonitor<T> getMonitor() {
+        return monitor;
+    }
+
+    /**
+     * @param monitor
+     * @inheritDoc
+     */
+    public void setMonitor(WorkMonitor<T> monitor) {
+        if (state == ExecutionState.RUNNING) {
+            throw new IllegalStateException("The TaskMaster must be stopped to 
set a Monitor.");
+        }
+        this.monitor = monitor;
+    }
+
+    @Override
+    public void setExceptionHandler(TaskExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
+    @Override
+    public void setDelayTimer(DelayTimer delayTimer) {
+        this.delayTimer = delayTimer;
+    }
+
+    @Override
+    public Date getFinishedWorking() {
+        return endTime;
+    }
+
+    @Override
+    public T getLastCompletedTask() {
+        return lastCompletedTask;
+    }
+
+    @Override
+    public long getCompletedTasks() {
+        return completedTasks.get();
+    }
+
+    @Override
+    public Date getStartTime() {
+        return startTime;
+    }
+
+    /**
+     * Sets the pool size
+     *
+     * @return
+     */
+    public int getPoolSize() {
+        return poolSize;
+    }
+
+    /**
+     * Returns the size of the pool
+     *
+     * @param poolSize
+     */
+    public void setPoolSize(int poolSize) {
+        this.poolSize = poolSize;
+        if (pool != null)
+            pool.setCorePoolSize(this.poolSize);
+    }
+
+    private class TaskExecutorPool extends ThreadPoolExecutor {
+
+        private static final long KEEP_ALIVE = 50000L;
+
+        public TaskExecutorPool() {
+            super(poolSize, poolSize, KEEP_ALIVE, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<Runnable>());
+            this.setRejectedExecutionHandler(new 
ThreadPoolExecutor.DiscardPolicy());
+        }
+
+        @Override
+        protected void afterExecute(Runnable r, Throwable thrwbl) {
+            super.afterExecute(r, thrwbl);
+
+
+            // try to reexecute the task runner while
+            // the task queue is not empty and while the pool
+            // is still completing the execution of tasks.
+            @SuppressWarnings("unchecked")
+            TaskExecutor taskExecutor = (TaskExecutor) r;
+
+            while (taskExecutor.getQueue().size() > 0 || getQueue().size() > 
0) {
+                if (taskExecutor.getQueue().size() > 0) {
+                    execute(r);
+                    return;
+                }
+                try {
+                    Thread.sleep(TICKLE_TIME);
+                } catch (InterruptedException e) {
+                    LOG.error("", e);
+                }
+            }
+
+            state = ExecutionState.COMPLETED;
+            // If this point is reached, a count of one means this completed 
thread
+            if (this.getActiveCount() == 1) {
+
+                // finish droid just once
+                taskExecutor.getDroid().finished();
+                shutdown();
+            }
+
+        }
+    }
+
+    private class TaskExecutor implements Runnable {
+
+        private final Droid<T> droid;
+        private final Queue<T> queue;
+        private final Worker<T> worker;
+
+        public TaskExecutor(Queue queue, Droid<T> droid) {
+            this.droid = droid;
+            this.queue = queue;
+            this.worker = droid.getNewWorker();
+        }
+
+        public Droid<T> getDroid() {
+            return droid;
+        }
+
+        public Queue<T> getQueue() {
+            return queue;
+        }
+
+        @SuppressWarnings("unused")
+        public Worker<? extends Task> getWorker() {
+            return worker;
+        }
+
+        @Override
+        public void run() {
+            // poll the last task
+            T task = queue.poll();
+
+            if (task == null) {
+                try {
+                    Thread.sleep(TICKLE_TIME);
+                } catch (InterruptedException e) {
+                    LOG.error("", e);
+                }
+                task = queue.poll();
+            }
+
+            // execute the task
+            if (task != null) {
+                try {
+                    // monitor the execution of the task
+                    if (monitor != null) {
+                        monitor.beforeExecute(task, worker);
+                    }
+
+                    // debug
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Worker [" + worker + "] execute task [" + 
task + "].");
+                    }
+
+                    // execute the task
+                    if (!task.isAborted()) {
+                        worker.execute(task);
+                    }
+
+                    // debug
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Worker [" + worker + "] executed task [" + 
task + "] with success.");
+                    }
+
+                    // monitor the execution of the task
+                    if (monitor != null) {
+                        monitor.afterExecute(task, worker, null);
+                    }
+
+                    // set the monitored variables
+                    completedTasks.incrementAndGet();
+                    lastCompletedTask = task;
+
+                } catch (Exception ex) {
+                    // debug
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Worker [" + worker + "] executed task [" + 
task + "] without success.");
+                    }
+
+                    // debug
+                    if (LOG.isErrorEnabled()) {
+                        LOG.error("", ex);
+                    }
+
+                    // monitor the exception
+                    if (monitor != null) {
+                        monitor.afterExecute(task, worker, ex);
+                    }
+
+                    // handler the exception
+                    if (ex != null) {
+                        TaskExceptionResult result = 
exceptionHandler.handleException(ex);
+
+                        // stop the execution in case of a fatal exception
+                        if (TaskExceptionResult.FATAL.equals(result)) {
+                            state = ExecutionState.STOPPED;
+                            droid.finished();
+                            pool.shutdownNow();
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

Propchange: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
URL: 
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java?rev=1439804&view=auto
==============================================================================
--- 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
 (added)
+++ 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
 Tue Jan 29 09:50:17 2013
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.taskmaster;
+
+import java.util.Date;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.droids.core.Droid;
+import org.apache.droids.core.Task;
+import org.apache.droids.core.TaskMaster;
+import org.apache.droids.core.Worker;
+import org.apache.droids.delay.DelayTimer;
+import org.apache.droids.exception.TaskExceptionHandler;
+import org.apache.droids.exception.TaskExceptionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequentialTaskMaster<T extends Task> implements TaskMaster<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SequentialTaskMaster.class);
+    private final Object mutex;
+    private volatile boolean completed;
+    private volatile Date startedWorking = null;
+    private volatile Date finishedWorking = null;
+    private volatile int completedTask = 0;
+    private volatile T lastCompletedTask = null;
+    private volatile ExecutionState state = ExecutionState.INITIALIZED;
+    private DelayTimer delayTimer = null;
+    private TaskExceptionHandler exHandler = null;
+
+    public SequentialTaskMaster() {
+        super();
+        this.mutex = new Object();
+    }
+
+    /**
+     * The queue has been initialized
+     */
+    @Override
+    public synchronized void start(final Queue<T> queue, final Droid<T> droid) 
{
+        this.completed = false;
+        this.startedWorking = new Date();
+        this.finishedWorking = null;
+        this.completedTask = 0;
+        this.state = ExecutionState.RUNNING;
+
+        boolean terminated = false;
+        while (!terminated) {
+            T task = queue.poll();
+            if (task == null) {
+                break;
+            }
+            if (delayTimer != null) {
+                long delay = delayTimer.getDelayMillis();
+                if (delay > 0) {
+                    try {
+                        Thread.sleep(delay);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            }
+            Worker<T> worker = droid.getNewWorker();
+            try {
+                if (!task.isAborted()) {
+                    worker.execute(task);
+                }
+                completedTask++;
+                lastCompletedTask = task;
+            } catch (Exception ex) {
+                TaskExceptionResult result = TaskExceptionResult.WARN;
+                if (exHandler != null) {
+                    result = exHandler.handleException(ex);
+                }
+                switch (result) {
+                    case WARN:
+                        LOG.warn(ex.toString() + " " + task.getURI());
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(ex.toString(), ex);
+                        }
+                        break;
+                    case FATAL:
+                        LOG.error(ex.getMessage(), ex);
+                        terminated = true;
+                        break;
+                    default:
+                        break;
+                }
+            }
+        }
+        finishedWorking = new Date();
+        this.state = ExecutionState.STOPPED;
+        droid.finished();
+        synchronized (mutex) {
+            completed = true;
+            mutex.notifyAll();
+        }
+    }
+
+    @Override
+    public final void setExceptionHandler(TaskExceptionHandler exHandler) {
+        this.exHandler = exHandler;
+    }
+
+    @Override
+    public final void setDelayTimer(DelayTimer delayTimer) {
+        this.delayTimer = delayTimer;
+    }
+
+    public boolean isWorking() {
+        return startedWorking != null && finishedWorking == null;
+    }
+
+    @Override
+    public Date getStartTime() {
+        return startedWorking;
+    }
+
+    @Override
+    public Date getFinishedWorking() {
+        return finishedWorking;
+    }
+
+    @Override
+    public long getCompletedTasks() {
+        return completedTask;
+    }
+
+    @Override
+    public T getLastCompletedTask() {
+        return lastCompletedTask;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+        if (timeout < 0) {
+            timeout = 0;
+        }
+        synchronized (this.mutex) {
+            long deadline = System.currentTimeMillis() + 
unit.toMillis(timeout);
+            long remaining = timeout;
+            while (!completed) {
+                this.mutex.wait(remaining);
+                if (timeout >= 0) {
+                    remaining = deadline - System.currentTimeMillis();
+                    if (remaining <= 0) {
+                        return false; // Reach if timeout is over and no 
finish.
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public ExecutionState getExecutionState() {
+        return state;
+    }
+}

Propchange: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java
URL: 
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java
 (original)
+++ 
incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java
 Tue Jan 29 09:50:17 2013
@@ -50,4 +50,9 @@ public class SimpleTask implements Task 
         return this.aborted;
     }
 
+    @Override
+    public Task createTask(URI uri) {
+        return new SimpleTask(uri, this.getDepth());
+    }
+
 }


Reply via email to