Revision: 18567
http://sourceforge.net/p/gate/code/18567
Author: johann_p
Date: 2015-02-10 13:20:37 +0000 (Tue, 10 Feb 2015)
Log Message:
-----------
Make use of the recent changes in GATE which allow to disable the autimatic
invokation of the controller
callback methods and instead to call them explicitly. GCP now runs the
controllerExecutionStarted() method
for each duplicated controller in sequence and after all controllers already
got their own corpus assigned.
After all documents have been processed, GCP runs the
controllerExecutionFinished() method in sequence for
each controller, with each corpus still assigned to each controller. The
corpora are only removed from
each controller after all the controllerExecutionFinished() methods have been
invoked.
The controllerExecutionAborted() method will never get invoked by GCP since no
processing error will actually
abort the processing of a controller.
This also changes things so that each controller is always associated with the
same corpus throughout.
This also now adds a suffix to each corpus name that indicates the number of
the corpus, e.g. GCPProcessorCorpus_3.
Modified Paths:
--------------
gcp/trunk/src/gate/cloud/batch/PooledDocumentProcessor.java
gcp/trunk/src/gate/cloud/util/GateResourcePool.java
Modified: gcp/trunk/src/gate/cloud/batch/PooledDocumentProcessor.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/PooledDocumentProcessor.java 2015-02-10
02:20:49 UTC (rev 18566)
+++ gcp/trunk/src/gate/cloud/batch/PooledDocumentProcessor.java 2015-02-10
13:20:37 UTC (rev 18567)
@@ -13,13 +13,13 @@
import gate.Corpus;
import gate.CorpusController;
-import gate.Document;
import gate.Factory;
import gate.cloud.io.DocumentData;
import gate.cloud.io.InputHandler;
import gate.cloud.io.OutputHandler;
import gate.cloud.io.StreamingInputHandler;
import gate.cloud.util.GateResourcePool;
+import gate.creole.AbstractController;
import gate.creole.ExecutionException;
import gate.creole.ResourceInstantiationException;
import gate.util.Benchmark;
@@ -27,6 +27,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
@@ -58,7 +59,7 @@
/**
* ID used as the basis for benchmark IDs.
*/
- private String id = "PooledGCPProcessor_" + uniqueNumber++;
+ private final String id = "PooledGCPProcessor_" + uniqueNumber++;
/**
* The number of threads used to process documents.
@@ -100,20 +101,23 @@
/**
* Pool holding corpora.
*/
- private GateResourcePool<Corpus> corpusPool;
+ //private GateResourcePool<Corpus> corpusPool;
/**
* Create a processor using the given number of threads to process
* documents.
+ * @param poolSize the size of the pool to create.
*/
public PooledDocumentProcessor(int poolSize) {
this.poolSize = poolSize;
}
+ @Override
public void setOutputHandlers(List<OutputHandler> outputHandlers) {
this.outputHandlers = outputHandlers;
}
+ @Override
public void setController(CorpusController c) {
templateController = c;
}
@@ -122,26 +126,49 @@
/* (non-Javadoc)
* @see
gate.sam.batch.DocumentProcessor#setInputHandler(gate.cloud.InputHandler)
*/
+ @Override
public void setInputHandler(InputHandler handler) {
this.inputHandler = handler;
}
+ @Override
public void setExecutor(Executor executor) {
this.executor = executor;
}
+ @Override
public void setResultQueue(BlockingQueue<ProcessResult> resultQueue) {
this.resultQueue = resultQueue;
}
+ @Override
public void init() throws ResourceInstantiationException {
// create the application pool
appPool = new GateResourcePool<CorpusController>();
appPool.fillPool(templateController, poolSize);
- corpusPool = new GateResourcePool<Corpus>();
- corpusPool.fillPool(Factory.newCorpus("GCPProcessor corpus"),
- poolSize);
+ // JP(20150210) we do not use a separate pool for the corpora any more.
Instead we go through
+ // the controllers we just created and for each, we add a new corpus
instance to it.
+ int i = 0;
+ //while(ctIt.hasNext()) {
+ for(CorpusController ct : appPool) {
+ ct.setCorpus(Factory.newCorpus("GCPProcessorCorpus_"+i));
+ i++;
+ }
+
+ // Now that all controllers have got their own corpus each, we go through
the controllers
+ // again and invoke the controllerExecutionStarted() callback method for
each.
+ // We also disable the automatic callbacks and thus prevent the callbacks
to get run for
+ // every individual document.
+ for(CorpusController ct : appPool) {
+ // disable the callbacks
+ ((AbstractController)ct).setControllerCallbacksEnabled(false);
+ try {
+ ((AbstractController)ct).invokeControllerExecutionStarted();
+ } catch (ExecutionException ex) {
+ log.error(id+": Exception when executing the
controllerExecutionStarted method for controller "+ct.getName(), ex);
+ }
+ }
}
/**
@@ -272,35 +299,27 @@
*/
private void processDocumentWithGATE(DocumentData docData, CorpusController
controller)
throws GateException, InterruptedException {
- try {
- Corpus myCorpus = corpusPool.take();
- if(myCorpus != null) {
- try {
- myCorpus.clear();
- myCorpus.add(docData.document);
- controller.setCorpus(myCorpus);
- // do benchmark logging if it has been enabled externally
- String bid = Benchmark.createBenchmarkId(id,
docData.document.getName());
- bid = Benchmark.createBenchmarkId("runApplication", bid);
+ Corpus myCorpus = controller.getCorpus();
+ if (myCorpus != null) {
+ try {
+ myCorpus.clear();
+ myCorpus.add(docData.document);
- // store the running time in a document feature
- long startTime = System.currentTimeMillis();
- Benchmark.executeWithBenchmarking(controller, bid, this, null);
- long timeTaken = System.currentTimeMillis() - startTime;
- docData.processingTime = timeTaken;
- }
- finally {
- myCorpus.clear();
- corpusPool.release(myCorpus);
- }
+ // do benchmark logging if it has been enabled externally
+ String bid = Benchmark.createBenchmarkId(id,
docData.document.getName());
+ bid = Benchmark.createBenchmarkId("runApplication", bid);
+
+ // store the running time in a document feature
+ long startTime = System.currentTimeMillis();
+ Benchmark.executeWithBenchmarking(controller, bid, this, null);
+ long timeTaken = System.currentTimeMillis() - startTime;
+ docData.processingTime = timeTaken;
+ } finally {
+ myCorpus.clear();
}
- else {
- throw new ExecutionException("Internal error: no corpus available");
- }
+ } else {
+ throw new ExecutionException("Internal error: no corpus available");
}
- finally {
- controller.setCorpus(null);
- }
}
/**
@@ -341,7 +360,22 @@
}
public void dispose() {
- log.info("Cleaning up PooledGCPProcessor");
+ log.info("Cleaning up PooledGCPProcessor");
+ // Run the controller callback method controllerExecutionFinished for all
controllers.
+ for(CorpusController ct : appPool) {
+ try {
+ ((AbstractController)ct).invokeControllerExecutionFinished();
+ } catch (ExecutionException ex) {
+ log.error(id+": Exception when executing the
controllerExecutionFinished method for controller "+ct.getName(), ex);
+ }
+ }
+ // Now dispose of all the corpora
+ for(CorpusController ct : appPool) {
+ Corpus co = ct.getCorpus();
+ ct.setCorpus(null);
+ Factory.deleteResource(co);
+ }
+
try {
inputHandler.close();
}
@@ -355,8 +389,7 @@
catch(Exception e) {
log.warn("Exception while closing output handler " + oh, e);
}
- }
+ }
appPool.dispose();
- corpusPool.dispose();
}
}
Modified: gcp/trunk/src/gate/cloud/util/GateResourcePool.java
===================================================================
--- gcp/trunk/src/gate/cloud/util/GateResourcePool.java 2015-02-10 02:20:49 UTC
(rev 18566)
+++ gcp/trunk/src/gate/cloud/util/GateResourcePool.java 2015-02-10 13:20:37 UTC
(rev 18567)
@@ -11,26 +11,10 @@
*/
package gate.cloud.util;
-import gate.Controller;
import gate.Factory;
-import gate.FeatureMap;
-import gate.Gate;
-import gate.ProcessingResource;
import gate.Resource;
import gate.creole.*;
-import gate.util.Files;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
-import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -40,7 +24,7 @@
* Class representing a pool of independent but equivalent GATE
* Resources.
*/
-public class GateResourcePool<T extends Resource> {
+public class GateResourcePool<T extends Resource> implements Iterable<T> {
private static int uniqueNumber = 1;
private static final Logger log = Logger.getLogger(GateResourcePool.class);
@@ -103,5 +87,10 @@
poolSize--;
}
}
+
+ @Override
+ public Iterator<T> iterator() {
+ return pool.iterator();
+ }
}
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Dive into the World of Parallel Programming. The Go Parallel Website,
sponsored by Intel and developed in partnership with Slashdot Media, is your
hub for all things parallel software development, from weekly thought
leadership blogs to news, videos, case studies, tutorials and more. Take a
look and join the conversation now. http://goparallel.sourceforge.net/
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs