Looks good, I hadn't thought of that approach. I was initially just
thinking of adding asyncAssertObject(....) to WorkingMemory interface.
My one concern with this approach is that it doesn't use the
WorkingMemory interface, meaning its not pluggeable - could that come
back to bite us? But I fully understand the dilemma of WorkingMemory
returning values.
With regards to fireAllRules I think a setAsyncStrategy is best. Where
the strategy decides if it calls fireAllRules for each assert, or after
a period of time or after a count value, or a combination. We should
just provide two basic strategies, allow users to impl their own.
I just with my concurrency desing skills were stronger :(
Mark
Greg Barton wrote:
I've attached an example of how I think the asynch
assert should be implemented. It's an asychronous
wrapper of any WorkingMemory that guarantees the
lifecycle methods will be executed in a seperate
thread, in the order they were invoked.
I used the backported concurrency classes to do this:
http://dcl.mathcs.emory.edu/util/backport-util-concurrent
Is this a worthy approach? Method return values and
exception handling are facilitated by the Future
returned by each method. If a FutureTask throws an
exception while executing, it will result in an
ExecutionException being thrown when you call get() to
retrieve the result. Methods that normally return
void now return null from get().
Comments?
GreG
__________________________________________________
Do You Yahoo!?
Tired of spam? Yahoo! Mail has the best spam protection around
http://mail.yahoo.com
------------------------------------------------------------------------
package org.drools.util.concurrent;
import org.drools.FactHandle;
import org.drools.WorkingMemory;
import org.drools.spi.AgendaFilter;
import org.drools.spi.AgendaGroup;
import edu.emory.mathcs.backport.java.util.Queue;
import edu.emory.mathcs.backport.java.util.concurrent.Callable;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.Executors;
import edu.emory.mathcs.backport.java.util.concurrent.Future;
import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
public class ConcurrentWorkingMemory {
private final WorkingMemory workingMemory;
//Gratitously lifted from jdk1.5 javadoc for Executor (SerialExecutor
example)
private final Executor executor = new Executor() {
final Queue tasks = new LinkedBlockingQueue();
final Executor executor = Executors.newSingleThreadExecutor();
Runnable active;
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
}
finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = (Runnable)tasks.poll()) != null) {
executor.execute(active);
}
}
};
public ConcurrentWorkingMemory(WorkingMemory workingMemory) {
super();
this.workingMemory = workingMemory;
}
private Future go(Callable callable) {
FutureTask future = new FutureTask(callable);
executor.execute(future);
return future;
}
public Future fireAllRules() {
return go(new Callable() {
public Object call() {
workingMemory.fireAllRules();
return null;
}
});
}
public Future fireAllRules(final AgendaFilter agendaFilter) {
return go(new Callable() {
public Object call() {
workingMemory.fireAllRules(agendaFilter);
return null;
}
});
}
public Future getObject(final FactHandle handle) {
return go(new Callable() {
public Object call() {
return workingMemory.getObject(handle);
}
});
}
public Future getFactHandle(final Object object) {
return go(new Callable() {
public Object call() {
return workingMemory.getFactHandle(object);
}
});
}
public Future getObjects() {
return go(new Callable() {
public Object call() {
return workingMemory.getObjects();
}
});
}
public Future getObjects(final Class objectClass) {
return go(new Callable() {
public Object call() {
return workingMemory.getObjects(objectClass);
}
});
}
public Future getFocus() {
return go(new Callable() {
public Object call() {
return workingMemory.getFocus();
}
});
}
public Future setFocus(final String focus) {
return go(new Callable() {
public Object call() {
workingMemory.setFocus(focus);
return null;
}
});
}
public Future setFocus(final AgendaGroup focus) {
return go(new Callable() {
public Object call() {
workingMemory.setFocus(focus);
return null;
}
});
}
public Future getFactHandles() {
return go(new Callable() {
public Object call() {
return workingMemory.getFactHandles();
}
});
}
public Future containsObject(final FactHandle handle) {
return go(new Callable() {
public Object call() {
return new
Boolean(workingMemory.containsObject(handle));
}
});
}
public Future assertObject(final Object object) {
return go(new Callable() {
public Object call() {
return workingMemory.assertObject(object);
}
});
}
public Future assertObject(final Object object, final boolean dynamic) {
return go(new Callable() {
public Object call() {
return workingMemory.assertObject(object,
dynamic);
}
});
}
public Future getQueryResults(final String query) {
return go(new Callable() {
public Object call() {
return workingMemory.getQueryResults(query);
}
});
}
public Future retractObject(final FactHandle handle) {
return go(new Callable() {
public Object call() {
workingMemory.retractObject(handle);
return null;
}
});
}
public Future modifyObject(final FactHandle handle, final Object
object) {
return go(new Callable() {
public Object call() {
workingMemory.modifyObject(handle, object);
return null;
}
});
}
public Future clearAgenda() {
return go(new Callable() {
public Object call() {
workingMemory.clearAgenda();
return null;
}
});
}
public Future dispose() {
return go(new Callable() {
public Object call() {
workingMemory.dispose();
return null;
}
});
}
}