Author: tobr
Date: Tue Jan 29 09:50:17 2013
New Revision: 1439804
URL: http://svn.apache.org/viewvc?rev=1439804&view=rev
Log:
defined crawler defaults
added logging
added link management
updated the API
Added:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
(with props)
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
(with props)
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
(with props)
Removed:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/MultiThreadedTaskMaster.java
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/SequentialTaskMaster.java
Modified:
incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java
incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java
Modified: incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml
URL:
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml (original)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml Tue Jan 29
09:50:17 2013
@@ -35,7 +35,7 @@
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>droids-core</artifactId>
- <name>Apache Droids Core</name>
+ <name>APACHE DROIDS CORE</name>
<inceptionYear>2007</inceptionYear>
<description>
Apache Droids API and core components
Modified:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java
URL:
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
---
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java
(original)
+++
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java
Tue Jan 29 09:50:17 2013
@@ -21,6 +21,7 @@ import org.apache.droids.helper.factorie
import org.apache.droids.helper.factories.ParserFactory;
import org.apache.droids.filter.Filter;
import org.apache.droids.parse.Parser;
+import org.apache.droids.taskmaster.MultiThreadedTaskMaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +35,7 @@ import java.util.concurrent.TimeUnit;
public abstract class AbstractDroid<T extends Task> implements Droid<T> {
protected final Queue<T> queue;
protected final TaskMaster<T> taskMaster;
- protected Fetcher fetcher;
+ protected Fetcher<T> fetcher;
protected ParserFactory parserFactory;
protected FilterFactory filterFactory;
protected HandlerFactory handlerFactory;
@@ -42,12 +43,12 @@ public abstract class AbstractDroid<T ex
protected final static Logger logger =
LoggerFactory.getLogger(AbstractDroid.class);
public AbstractDroid() {
- this(new SimpleTaskQueueWithHistory<T>(), new
MultiThreadedTaskMaster<T>());
+ this(null, null);
}
public AbstractDroid(Queue<T> queue, TaskMaster<T> taskMaster) {
- this.queue = queue;
- this.taskMaster = taskMaster;
+ this.queue = queue == null ? new SimpleTaskQueueWithHistory<T>() :
queue;
+ this.taskMaster = queue == null ? new MultiThreadedTaskMaster<T>() :
taskMaster;
this.parserFactory = new ParserFactory();
this.filterFactory = new FilterFactory();
this.handlerFactory = new HandlerFactory();
@@ -71,11 +72,19 @@ public abstract class AbstractDroid<T ex
@Override
public void add(T task) {
+ logger.debug("add task: " + task.getURI());
queue.add(task);
}
@Override
+ public T filter(T task) {
+ logger.debug("filter task: " + task.getURI());
+ return this.filterFactory.filter(task);
+ }
+
+ @Override
public void load(T task) throws DroidsException, IOException {
+ logger.debug("load task: " + task.getURI());
if (this.fetcher == null) {
throw new DroidsException("Fetcher not set");
} else {
@@ -85,17 +94,20 @@ public abstract class AbstractDroid<T ex
@Override
public void parse(T task) throws DroidsException, IOException {
+ logger.debug("parse task: " + task.getURI());
this.parserFactory.parse(task);
}
@Override
public void handle(T task) throws DroidsException, IOException {
+ logger.debug("handle task: " + task.getURI());
this.handlerFactory.handle(task);
}
@Override
- public T filter(T task) {
- return this.filterFactory.filter(task);
+ public void finish(T task) throws DroidsException, IOException {
+ task.getContentEntity().close();
+ logger.debug("finished task: " + task.getURI());
}
public void setFetcher(Fetcher fetcher) {
Modified:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java
URL:
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
---
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java
(original)
+++
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java
Tue Jan 29 09:50:17 2013
@@ -1,8 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.droids.core;
+import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
/**
*
@@ -13,7 +31,7 @@ public class ContentEntity {
private Map<String, Object> data;
public final static String CONTENT = "content";
- public final static String MIME_TYPE = "mime";
+ public final static String CONTENT_TYPE = "content-type";
public final static String CONTENT_LENGTH = "content-length";
public final static String LINKS = "links";
@@ -33,15 +51,31 @@ public class ContentEntity {
this.put(CONTENT, in);
}
+ public <T extends Task> void setLinks(Set<T> links) {
+ this.put(LINKS, links);
+ }
+
public InputStream getContent() throws DroidsException {
if (this.getValue(CONTENT) != null) {
if (this.getValue(CONTENT) instanceof InputStream) {
return (InputStream)this.getValue(CONTENT);
} else {
- throw new DroidsException("wrong type of content");
+ throw new DroidsException("no content available");
+ }
+ } else {
+ return null;
+ }
+ }
+
+ public <T extends Task> Set<T> getLinks() throws DroidsException {
+ if (this.getValue(LINKS) != null) {
+ if (this.getValue(LINKS) instanceof Set) {
+ return (Set<T>)this.getValue(LINKS);
+ } else {
+ throw new DroidsException("no set of links available");
}
} else {
- throw new NullPointerException();
+ return null;
}
}
@@ -49,4 +83,10 @@ public class ContentEntity {
return data;
}
+ public void close() throws DroidsException, IOException {
+ if (this.getContent() != null) {
+ this.getContent().close();
+ }
+ }
+
}
Modified:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java
URL:
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
---
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java
(original)
+++
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java
Tue Jan 29 09:50:17 2013
@@ -78,6 +78,15 @@ public interface Droid<T extends Task> {
public void handle(T task) throws DroidsException, IOException;
/**
+ * Finish the task.
+ * Close resources like InputStreams, perform monitoring and clean up the
task.
+ *
+ * @param task the task to handle
+ */
+ public void finish(T task) throws DroidsException, IOException;
+
+
+ /**
* Filter the task.
*
* @param task the task to filter
Modified:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
URL:
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
---
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
(original)
+++
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
Tue Jan 29 09:50:17 2013
@@ -59,4 +59,6 @@ public interface Task extends Serializab
public void abort();
public boolean isAborted();
+
+ public Task createTask(URI uri);
}
Modified:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java
URL:
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
---
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java
(original)
+++
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java
Tue Jan 29 09:50:17 2013
@@ -23,6 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
+import java.util.HashSet;
import java.util.Set;
/**
@@ -36,7 +37,7 @@ public class SysoutHandler extends Write
private static final Logger logger =
LoggerFactory.getLogger(SysoutHandler.class);
public SysoutHandler() {
- super();
+ this(new HashSet<String>());
}
public SysoutHandler(Set<String> attributes) {
@@ -53,7 +54,7 @@ public class SysoutHandler extends Write
@Override
public void handle(Task task) throws IOException, DroidsException {
for (String key : task.getContentEntity().getData().keySet()) {
- if (attributes.contains(key)) {
+ if (attributes.contains(key) || attributes.isEmpty()) {
if (task.getContentEntity().getValue(key) instanceof
InputStream) {
InputStream instream = (InputStream)
task.getContentEntity().getValue(key);
try {
Modified:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java
URL:
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
---
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java
(original)
+++
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java
Tue Jan 29 09:50:17 2013
@@ -26,12 +26,12 @@ import org.apache.droids.core.Task;
*
* @version 1.0
*/
-public interface Parser {
+public interface Parser<T extends Task> {
/**
* Creates the parse for some content.
*
* @param task the task that correspond to the stream
* @return the parse object
*/
- public void parse(Task task) throws DroidsException, IOException;
+ public void parse(T task) throws DroidsException, IOException;
}
Added:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
URL:
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java?rev=1439804&view=auto
==============================================================================
---
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
(added)
+++
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
Tue Jan 29 09:50:17 2013
@@ -0,0 +1,36 @@
+package org.apache.droids.parse;
+
+import org.apache.droids.core.DroidsException;
+import org.apache.droids.core.Task;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ *
+ *
+ *
+ */
+public class SimpleLinkParser<T extends Task> implements Parser<T> {
+
+ @Override
+ public void parse(T task) throws DroidsException, IOException {
+ InputStream inStream = task.getContentEntity().getContent();
+ if (inStream != null) {
+ Scanner s = new Scanner(inStream).useDelimiter("\\A");
+ String content = s.hasNext() ? s.next() : "";
+ Pattern linkPattern =
Pattern.compile("<a[^>]+href=[\"']?([^\"'>]+)[\"']?[^>]*>(.+?)</a>",
Pattern.CASE_INSENSITIVE|Pattern.DOTALL);
+ Matcher pageMatcher = linkPattern.matcher(content);
+ Set<Task> links = new HashSet<Task>();
+ while(pageMatcher.find()){
+
links.add(task.createTask(task.getURI().resolve(pageMatcher.group(1))));
+ }
+ task.getContentEntity().setLinks(links);
+ }
+ }
+}
Propchange:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
URL:
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java?rev=1439804&view=auto
==============================================================================
---
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
(added)
+++
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
Tue Jan 29 09:50:17 2013
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.taskmaster;
+
+import java.util.Date;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.droids.core.Droid;
+import org.apache.droids.core.Task;
+import org.apache.droids.core.TaskMaster;
+import org.apache.droids.core.Worker;
+import org.apache.droids.delay.DelayTimer;
+import org.apache.droids.exception.TaskExceptionHandler;
+import org.apache.droids.exception.TaskExceptionResult;
+import org.apache.droids.monitor.WorkMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for running all the tasks
+ */
+public class MultiThreadedTaskMaster<T extends Task> implements TaskMaster<T> {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(MultiThreadedTaskMaster.class);
+ private static final long TICKLE_TIME = 1000L;
+
+ /**
+ * The execution state
+ */
+ protected volatile ExecutionState state = ExecutionState.STOPPED;
+ /**
+ * The delay timer
+ */
+ protected DelayTimer delayTimer;
+ /**
+ * The start time
+ */
+ protected Date startTime;
+ /**
+ * The end time
+ */
+ protected Date endTime;
+ /**
+ * The last completed task
+ */
+ protected T lastCompletedTask;
+ /**
+ * The completed task counter
+ */
+ protected AtomicLong completedTasks = new AtomicLong();
+ /**
+ * The monitor that that records the processing of tasks
+ */
+ protected WorkMonitor<T> monitor;
+ /**
+ * The task exception handler
+ */
+ protected TaskExceptionHandler exceptionHandler;
+
+ /*
+ * The pool size
+ */
+ private int poolSize = 1;
+ /**
+ * The pool
+ */
+ private TaskExecutorPool pool;
+
+ @Override
+ public void start(Queue<T> queue, Droid<T> droid) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Start the executor service.");
+ }
+
+ state = ExecutionState.RUNNING;
+
+ if (pool == null) {
+ this.pool = new TaskExecutorPool();
+ this.pool.setCorePoolSize(this.poolSize);
+ }
+
+ // Stagger startup
+ for (int i = 0; i < poolSize; i++) {
+ try {
+ Thread.sleep(TICKLE_TIME);
+ } catch (InterruptedException ignored) {
+ LOG.error("", ignored);
+ }
+ pool.execute(new TaskExecutor(queue, droid));
+ }
+ }
+
+ /**
+ * Stops the TaskMaster
+ */
+ public void stop() {
+ // debug
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Stop the executor service.");
+ }
+
+ state = ExecutionState.STOPPED;
+
+ // Disable new tasks from being submitted
+ pool.shutdown();
+
+ // Wait a while for existing tasks to terminate
+ try {
+ if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
+
+ // Cancel currently executing tasks
+ pool.shutdownNow();
+
+ // Wait a while for to respond to being canceled
+ if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Scheduler did not stop.");
+ }
+ }
+ }
+ } catch (InterruptedException ex) {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Force scheduler to stop.");
+ }
+
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+
+ // debug
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Scheduler stopped.");
+ }
+
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ return pool.awaitTermination(timeout, unit);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ @Override
+ public ExecutionState getExecutionState() {
+ return state;
+ }
+
+ /**
+ * @return
+ * @inheritDoc
+ */
+ public WorkMonitor<T> getMonitor() {
+ return monitor;
+ }
+
+ /**
+ * @param monitor
+ * @inheritDoc
+ */
+ public void setMonitor(WorkMonitor<T> monitor) {
+ if (state == ExecutionState.RUNNING) {
+ throw new IllegalStateException("The TaskMaster must be stopped to
set a Monitor.");
+ }
+ this.monitor = monitor;
+ }
+
+ @Override
+ public void setExceptionHandler(TaskExceptionHandler exceptionHandler) {
+ this.exceptionHandler = exceptionHandler;
+ }
+
+ @Override
+ public void setDelayTimer(DelayTimer delayTimer) {
+ this.delayTimer = delayTimer;
+ }
+
+ @Override
+ public Date getFinishedWorking() {
+ return endTime;
+ }
+
+ @Override
+ public T getLastCompletedTask() {
+ return lastCompletedTask;
+ }
+
+ @Override
+ public long getCompletedTasks() {
+ return completedTasks.get();
+ }
+
+ @Override
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Sets the pool size
+ *
+ * @return
+ */
+ public int getPoolSize() {
+ return poolSize;
+ }
+
+ /**
+ * Returns the size of the pool
+ *
+ * @param poolSize
+ */
+ public void setPoolSize(int poolSize) {
+ this.poolSize = poolSize;
+ if (pool != null)
+ pool.setCorePoolSize(this.poolSize);
+ }
+
+ private class TaskExecutorPool extends ThreadPoolExecutor {
+
+ private static final long KEEP_ALIVE = 50000L;
+
+ public TaskExecutorPool() {
+ super(poolSize, poolSize, KEEP_ALIVE, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<Runnable>());
+ this.setRejectedExecutionHandler(new
ThreadPoolExecutor.DiscardPolicy());
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable thrwbl) {
+ super.afterExecute(r, thrwbl);
+
+
+ // try to reexecute the task runner while
+ // the task queue is not empty and while the pool
+ // is still completing the execution of tasks.
+ @SuppressWarnings("unchecked")
+ TaskExecutor taskExecutor = (TaskExecutor) r;
+
+ while (taskExecutor.getQueue().size() > 0 || getQueue().size() >
0) {
+ if (taskExecutor.getQueue().size() > 0) {
+ execute(r);
+ return;
+ }
+ try {
+ Thread.sleep(TICKLE_TIME);
+ } catch (InterruptedException e) {
+ LOG.error("", e);
+ }
+ }
+
+ state = ExecutionState.COMPLETED;
+ // If this point is reached, a count of one means this completed
thread
+ if (this.getActiveCount() == 1) {
+
+ // finish droid just once
+ taskExecutor.getDroid().finished();
+ shutdown();
+ }
+
+ }
+ }
+
+ private class TaskExecutor implements Runnable {
+
+ private final Droid<T> droid;
+ private final Queue<T> queue;
+ private final Worker<T> worker;
+
+ public TaskExecutor(Queue queue, Droid<T> droid) {
+ this.droid = droid;
+ this.queue = queue;
+ this.worker = droid.getNewWorker();
+ }
+
+ public Droid<T> getDroid() {
+ return droid;
+ }
+
+ public Queue<T> getQueue() {
+ return queue;
+ }
+
+ @SuppressWarnings("unused")
+ public Worker<? extends Task> getWorker() {
+ return worker;
+ }
+
+ @Override
+ public void run() {
+ // poll the last task
+ T task = queue.poll();
+
+ if (task == null) {
+ try {
+ Thread.sleep(TICKLE_TIME);
+ } catch (InterruptedException e) {
+ LOG.error("", e);
+ }
+ task = queue.poll();
+ }
+
+ // execute the task
+ if (task != null) {
+ try {
+ // monitor the execution of the task
+ if (monitor != null) {
+ monitor.beforeExecute(task, worker);
+ }
+
+ // debug
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Worker [" + worker + "] execute task [" +
task + "].");
+ }
+
+ // execute the task
+ if (!task.isAborted()) {
+ worker.execute(task);
+ }
+
+ // debug
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Worker [" + worker + "] executed task [" +
task + "] with success.");
+ }
+
+ // monitor the execution of the task
+ if (monitor != null) {
+ monitor.afterExecute(task, worker, null);
+ }
+
+ // set the monitored variables
+ completedTasks.incrementAndGet();
+ lastCompletedTask = task;
+
+ } catch (Exception ex) {
+ // debug
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Worker [" + worker + "] executed task [" +
task + "] without success.");
+ }
+
+ // debug
+ if (LOG.isErrorEnabled()) {
+ LOG.error("", ex);
+ }
+
+ // monitor the exception
+ if (monitor != null) {
+ monitor.afterExecute(task, worker, ex);
+ }
+
+ // handler the exception
+ if (ex != null) {
+ TaskExceptionResult result =
exceptionHandler.handleException(ex);
+
+ // stop the execution in case of a fatal exception
+ if (TaskExceptionResult.FATAL.equals(result)) {
+ state = ExecutionState.STOPPED;
+ droid.finished();
+ pool.shutdownNow();
+ }
+ }
+ }
+ }
+ }
+ }
+}
Propchange:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
URL:
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java?rev=1439804&view=auto
==============================================================================
---
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
(added)
+++
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
Tue Jan 29 09:50:17 2013
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.taskmaster;
+
+import java.util.Date;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.droids.core.Droid;
+import org.apache.droids.core.Task;
+import org.apache.droids.core.TaskMaster;
+import org.apache.droids.core.Worker;
+import org.apache.droids.delay.DelayTimer;
+import org.apache.droids.exception.TaskExceptionHandler;
+import org.apache.droids.exception.TaskExceptionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequentialTaskMaster<T extends Task> implements TaskMaster<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SequentialTaskMaster.class);
+ private final Object mutex;
+ private volatile boolean completed;
+ private volatile Date startedWorking = null;
+ private volatile Date finishedWorking = null;
+ private volatile int completedTask = 0;
+ private volatile T lastCompletedTask = null;
+ private volatile ExecutionState state = ExecutionState.INITIALIZED;
+ private DelayTimer delayTimer = null;
+ private TaskExceptionHandler exHandler = null;
+
+ public SequentialTaskMaster() {
+ super();
+ this.mutex = new Object();
+ }
+
+ /**
+ * The queue has been initialized
+ */
+ @Override
+ public synchronized void start(final Queue<T> queue, final Droid<T> droid)
{
+ this.completed = false;
+ this.startedWorking = new Date();
+ this.finishedWorking = null;
+ this.completedTask = 0;
+ this.state = ExecutionState.RUNNING;
+
+ boolean terminated = false;
+ while (!terminated) {
+ T task = queue.poll();
+ if (task == null) {
+ break;
+ }
+ if (delayTimer != null) {
+ long delay = delayTimer.getDelayMillis();
+ if (delay > 0) {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ Worker<T> worker = droid.getNewWorker();
+ try {
+ if (!task.isAborted()) {
+ worker.execute(task);
+ }
+ completedTask++;
+ lastCompletedTask = task;
+ } catch (Exception ex) {
+ TaskExceptionResult result = TaskExceptionResult.WARN;
+ if (exHandler != null) {
+ result = exHandler.handleException(ex);
+ }
+ switch (result) {
+ case WARN:
+ LOG.warn(ex.toString() + " " + task.getURI());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(ex.toString(), ex);
+ }
+ break;
+ case FATAL:
+ LOG.error(ex.getMessage(), ex);
+ terminated = true;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ finishedWorking = new Date();
+ this.state = ExecutionState.STOPPED;
+ droid.finished();
+ synchronized (mutex) {
+ completed = true;
+ mutex.notifyAll();
+ }
+ }
+
+ @Override
+ public final void setExceptionHandler(TaskExceptionHandler exHandler) {
+ this.exHandler = exHandler;
+ }
+
+ @Override
+ public final void setDelayTimer(DelayTimer delayTimer) {
+ this.delayTimer = delayTimer;
+ }
+
+ public boolean isWorking() {
+ return startedWorking != null && finishedWorking == null;
+ }
+
+ @Override
+ public Date getStartTime() {
+ return startedWorking;
+ }
+
+ @Override
+ public Date getFinishedWorking() {
+ return finishedWorking;
+ }
+
+ @Override
+ public long getCompletedTasks() {
+ return completedTask;
+ }
+
+ @Override
+ public T getLastCompletedTask() {
+ return lastCompletedTask;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ if (timeout < 0) {
+ timeout = 0;
+ }
+ synchronized (this.mutex) {
+ long deadline = System.currentTimeMillis() +
unit.toMillis(timeout);
+ long remaining = timeout;
+ while (!completed) {
+ this.mutex.wait(remaining);
+ if (timeout >= 0) {
+ remaining = deadline - System.currentTimeMillis();
+ if (remaining <= 0) {
+ return false; // Reach if timeout is over and no
finish.
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public ExecutionState getExecutionState() {
+ return state;
+ }
+}
Propchange:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java
URL:
http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
---
incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java
(original)
+++
incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java
Tue Jan 29 09:50:17 2013
@@ -50,4 +50,9 @@ public class SimpleTask implements Task
return this.aborted;
}
+ @Override
+ public Task createTask(URI uri) {
+ return new SimpleTask(uri, this.getDepth());
+ }
+
}