Revision: 18567
          http://sourceforge.net/p/gate/code/18567
Author:   johann_p
Date:     2015-02-10 13:20:37 +0000 (Tue, 10 Feb 2015)
Log Message:
-----------
Make use of the recent changes in GATE which allow to disable the autimatic 
invokation of the controller 
callback methods and instead to call them explicitly. GCP now runs the 
controllerExecutionStarted() method 
for each duplicated controller in sequence and after all controllers already 
got their own corpus assigned.
After all documents have been processed, GCP runs the 
controllerExecutionFinished() method in sequence for
each controller, with each corpus still assigned to each controller. The 
corpora are only removed from 
each controller after all the controllerExecutionFinished() methods have been 
invoked.
The controllerExecutionAborted() method will never get invoked by GCP since no 
processing error will actually
abort the processing of a controller. 
This also changes things so that each controller is always associated with the 
same corpus throughout.
This also now adds a suffix to each corpus name that indicates the number of 
the corpus, e.g. GCPProcessorCorpus_3. 

Modified Paths:
--------------
    gcp/trunk/src/gate/cloud/batch/PooledDocumentProcessor.java
    gcp/trunk/src/gate/cloud/util/GateResourcePool.java

Modified: gcp/trunk/src/gate/cloud/batch/PooledDocumentProcessor.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/PooledDocumentProcessor.java 2015-02-10 
02:20:49 UTC (rev 18566)
+++ gcp/trunk/src/gate/cloud/batch/PooledDocumentProcessor.java 2015-02-10 
13:20:37 UTC (rev 18567)
@@ -13,13 +13,13 @@
 
 import gate.Corpus;
 import gate.CorpusController;
-import gate.Document;
 import gate.Factory;
 import gate.cloud.io.DocumentData;
 import gate.cloud.io.InputHandler;
 import gate.cloud.io.OutputHandler;
 import gate.cloud.io.StreamingInputHandler;
 import gate.cloud.util.GateResourcePool;
+import gate.creole.AbstractController;
 import gate.creole.ExecutionException;
 import gate.creole.ResourceInstantiationException;
 import gate.util.Benchmark;
@@ -27,6 +27,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
@@ -58,7 +59,7 @@
   /**
    * ID used as the basis for benchmark IDs.
    */
-  private String id = "PooledGCPProcessor_" + uniqueNumber++;
+  private final String id = "PooledGCPProcessor_" + uniqueNumber++;
 
   /**
    * The number of threads used to process documents.
@@ -100,20 +101,23 @@
   /**
    * Pool holding corpora.
    */
-  private GateResourcePool<Corpus> corpusPool;
+  //private GateResourcePool<Corpus> corpusPool;
 
   /**
    * Create a processor using the given number of threads to process
    * documents.
+   * @param poolSize the size of the pool to create.
    */
   public PooledDocumentProcessor(int poolSize) {
     this.poolSize = poolSize;
   }
 
+  @Override
   public void setOutputHandlers(List<OutputHandler> outputHandlers) {
     this.outputHandlers = outputHandlers;
   }
 
+  @Override
   public void setController(CorpusController c) {
     templateController = c;
   }
@@ -122,26 +126,49 @@
   /* (non-Javadoc)
    * @see 
gate.sam.batch.DocumentProcessor#setInputHandler(gate.cloud.InputHandler)
    */
+  @Override
   public void setInputHandler(InputHandler handler) {
     this.inputHandler = handler;
   }
 
+  @Override
   public void setExecutor(Executor executor) {
     this.executor = executor;
   }
 
+  @Override
   public void setResultQueue(BlockingQueue<ProcessResult> resultQueue) {
     this.resultQueue = resultQueue;
   }
 
+  @Override
   public void init() throws ResourceInstantiationException {
     // create the application pool
     appPool = new GateResourcePool<CorpusController>();
     appPool.fillPool(templateController, poolSize);
 
-    corpusPool = new GateResourcePool<Corpus>();
-    corpusPool.fillPool(Factory.newCorpus("GCPProcessor corpus"),
-            poolSize);
+    // JP(20150210) we do not use a separate pool for the corpora any more. 
Instead we go through
+    // the controllers we just created and for each, we add a new corpus 
instance to it.
+    int i = 0;
+    //while(ctIt.hasNext()) {
+    for(CorpusController ct : appPool) {
+      ct.setCorpus(Factory.newCorpus("GCPProcessorCorpus_"+i));
+      i++;
+    }
+    
+    // Now that all controllers have got their own corpus each, we go through 
the controllers
+    // again and invoke the controllerExecutionStarted() callback method for 
each.
+    // We also disable the automatic callbacks and thus prevent the callbacks 
to get run for 
+    // every individual document.
+    for(CorpusController ct : appPool) {
+        // disable the callbacks
+        ((AbstractController)ct).setControllerCallbacksEnabled(false);
+        try {
+          ((AbstractController)ct).invokeControllerExecutionStarted();
+        } catch (ExecutionException ex) {
+          log.error(id+": Exception when executing the 
controllerExecutionStarted method for controller "+ct.getName(), ex);
+        }
+    }
   }
 
   /**
@@ -272,35 +299,27 @@
    */
   private void processDocumentWithGATE(DocumentData docData, CorpusController 
controller)
           throws GateException, InterruptedException {
-    try {
-      Corpus myCorpus = corpusPool.take();
-      if(myCorpus != null) {
-        try {
-          myCorpus.clear();
-          myCorpus.add(docData.document);
-          controller.setCorpus(myCorpus);
-          // do benchmark logging if it has been enabled externally
-          String bid = Benchmark.createBenchmarkId(id, 
docData.document.getName());
-          bid = Benchmark.createBenchmarkId("runApplication", bid);
+    Corpus myCorpus = controller.getCorpus();
+    if (myCorpus != null) {
+      try {
+        myCorpus.clear();
+        myCorpus.add(docData.document);
 
-          // store the running time in a document feature
-          long startTime = System.currentTimeMillis();
-          Benchmark.executeWithBenchmarking(controller, bid, this, null);
-          long timeTaken = System.currentTimeMillis() - startTime;
-          docData.processingTime = timeTaken;
-        }
-        finally {
-          myCorpus.clear();
-          corpusPool.release(myCorpus);
-        }
+        // do benchmark logging if it has been enabled externally
+        String bid = Benchmark.createBenchmarkId(id, 
docData.document.getName());
+        bid = Benchmark.createBenchmarkId("runApplication", bid);
+
+        // store the running time in a document feature
+        long startTime = System.currentTimeMillis();
+        Benchmark.executeWithBenchmarking(controller, bid, this, null);
+        long timeTaken = System.currentTimeMillis() - startTime;
+        docData.processingTime = timeTaken;
+      } finally {
+        myCorpus.clear();
       }
-      else {
-        throw new ExecutionException("Internal error: no corpus available");
-      }
+    } else {
+      throw new ExecutionException("Internal error: no corpus available");
     }
-    finally {
-      controller.setCorpus(null);
-    }
   }
 
   /**
@@ -341,7 +360,22 @@
   }
 
   public void dispose() {
-    log.info("Cleaning up PooledGCPProcessor");
+    log.info("Cleaning up PooledGCPProcessor");    
+    // Run the controller callback method controllerExecutionFinished for all 
controllers. 
+    for(CorpusController ct : appPool) {
+      try {
+        ((AbstractController)ct).invokeControllerExecutionFinished();
+      } catch (ExecutionException ex) {
+        log.error(id+": Exception when executing the 
controllerExecutionFinished method for controller "+ct.getName(), ex);
+      }
+    }    
+    // Now dispose of all the corpora 
+    for(CorpusController ct : appPool) {
+      Corpus co = ct.getCorpus();
+      ct.setCorpus(null);
+      Factory.deleteResource(co);
+    }    
+    
     try {
       inputHandler.close();
     }
@@ -355,8 +389,7 @@
       catch(Exception e) {
         log.warn("Exception while closing output handler " + oh, e);
       }
-    }
+    }    
     appPool.dispose();
-    corpusPool.dispose();
   }
 }

Modified: gcp/trunk/src/gate/cloud/util/GateResourcePool.java
===================================================================
--- gcp/trunk/src/gate/cloud/util/GateResourcePool.java 2015-02-10 02:20:49 UTC 
(rev 18566)
+++ gcp/trunk/src/gate/cloud/util/GateResourcePool.java 2015-02-10 13:20:37 UTC 
(rev 18567)
@@ -11,26 +11,10 @@
  */
 package gate.cloud.util;
 
-import gate.Controller;
 import gate.Factory;
-import gate.FeatureMap;
-import gate.Gate;
-import gate.ProcessingResource;
 import gate.Resource;
 import gate.creole.*;
-import gate.util.Files;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
@@ -40,7 +24,7 @@
  * Class representing a pool of independent but equivalent GATE
  * Resources.
  */
-public class GateResourcePool<T extends Resource> {
+public class GateResourcePool<T extends Resource>  implements Iterable<T> {
   private static int uniqueNumber = 1;
   
   private static final Logger log = Logger.getLogger(GateResourcePool.class);
@@ -103,5 +87,10 @@
       poolSize--;
     }
   }
+  
+  @Override
+  public Iterator<T> iterator() {
+    return pool.iterator();
+  }
 
 }

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
Dive into the World of Parallel Programming. The Go Parallel Website,
sponsored by Intel and developed in partnership with Slashdot Media, is your
hub for all things parallel software development, from weekly thought
leadership blogs to news, videos, case studies, tutorials and more. Take a
look and join the conversation now. http://goparallel.sourceforge.net/
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to