Revision: 19372
          http://sourceforge.net/p/gate/code/19372
Author:   ian_roberts
Date:     2016-05-31 16:23:57 +0000 (Tue, 31 May 2016)
Log Message:
-----------
We've never made use of the fact that one BatchRunner could run more than one 
batch at the same time, so removed this capability to simplify the code.

Modified Paths:
--------------
    gcp/trunk/src/gate/cloud/batch/BatchRunner.java

Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/BatchRunner.java     2016-05-31 16:12:01 UTC 
(rev 19371)
+++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java     2016-05-31 16:23:57 UTC 
(rev 19372)
@@ -11,27 +11,31 @@
  */
 package gate.cloud.batch;
 
+import gate.CorpusController;
 import gate.Gate;
 import gate.cloud.batch.BatchJobData.JobState;
 import gate.cloud.batch.ProcessResult.ReturnCode;
+import gate.cloud.io.DocumentEnumerator;
+import gate.cloud.io.IOConstants;
+import gate.cloud.io.InputHandler;
+import gate.cloud.io.OutputHandler;
+import gate.cloud.io.StreamingInputHandler;
 import gate.cloud.util.CLibrary;
 import gate.cloud.util.Tools;
 import gate.cloud.util.XMLBatchParser;
 import gate.creole.ResourceInstantiationException;
 import gate.util.GateException;
+import gate.util.persistence.PersistenceManager;
 
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Hashtable;
-import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
@@ -44,28 +48,18 @@
 import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamWriter;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Logger;
-
-import com.sun.jna.Platform;
-import gate.CorpusController;
-import gate.cloud.io.DocumentData;
-import gate.cloud.io.DocumentEnumerator;
-import gate.cloud.io.IOConstants;
-import gate.cloud.io.InputHandler;
-import gate.cloud.io.OutputHandler;
-import gate.cloud.io.StreamingInputHandler;
-import gate.util.persistence.PersistenceManager;
-import java.util.LinkedList;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
 
+import com.sun.jna.Platform;
+
 /**
- * This class is a Batch Runner, i.e. it manages the execution of batch jobs,
- * each of them being specified by a {@link Batch} object. This class is a
- * singleton, so you can only obtain an instance via the getInstance()method.
+ * This class is a Batch Runner, i.e. it manages the execution of a batch job,
+ * specified by a {@link Batch} object.
  */
 public class BatchRunner {
   private static final Logger log = Logger.getLogger(BatchRunner.class);
@@ -173,6 +167,18 @@
                 if(Thread.interrupted()) { return; }
               }
             }
+            // shut down the executor and wait for it to terminate
+            executor.shutdown();
+            while(!executor.isTerminated()) {
+              try {
+                executor.awaitTermination(60L, TimeUnit.SECONDS);
+              } catch(InterruptedException e) {
+                // just re-interrupt ourselves and give up
+                Thread.currentThread().interrupt();
+              }
+            }
+
+            // now we know the batch is finished
             resultQueue.add(new EndOfBatchResult());
           }
         }, "Batch \"" + getBatchId() + "\"-job-pusher");
@@ -307,65 +313,58 @@
         long startTime = System.currentTimeMillis();
         try {
           boolean jobsStillRunning = false;
-          // for each job
-          Iterator<String> jobsIter = runningJobs.keySet().iterator();
-          while(jobsIter.hasNext()) {
-            String jobId = jobsIter.next();
-            BatchHandler job = runningJobs.get(jobId);
-            if(job.getState() == JobState.RUNNING) {
-              List<ProcessResult> results = new ArrayList<ProcessResult>();
-              int resultsCount = job.resultQueue.drainTo(results);
-              boolean finishedBatch = false;
-              try {
-                for(ProcessResult result : results) {
-                  if(result.getReturnCode() == ReturnCode.END_OF_BATCH) {
-                    finishedBatch = true;
-                  } else {
-                    long fileSize = result.getOriginalFileSize();
-                    long docLength = result.getDocumentLength();
-                    if(fileSize > 0) job.totalBytes += fileSize;
-                    if(docLength > 0) job.totalChars += docLength;
-                    
-                    job.reportWriter.writeCharacters("\n");
-                    Tools.writeResultToXml(result, job.reportWriter);
-                    switch(result.getReturnCode()){
-                      case SUCCESS:
-                        job.successDocs++;
-                        break;
-                      case FAIL:
-                        job.errorDocs++;
-                        break;
-                    }
+          BatchHandler job = runningJob;
+          if(job.getState() == JobState.RUNNING) {
+            List<ProcessResult> results = new ArrayList<ProcessResult>();
+            int resultsCount = job.resultQueue.drainTo(results);
+            boolean finishedBatch = false;
+            try {
+              for(ProcessResult result : results) {
+                if(result.getReturnCode() == ReturnCode.END_OF_BATCH) {
+                  finishedBatch = true;
+                } else {
+                  long fileSize = result.getOriginalFileSize();
+                  long docLength = result.getDocumentLength();
+                  if(fileSize > 0) job.totalBytes += fileSize;
+                  if(docLength > 0) job.totalChars += docLength;
+                  
+                  job.reportWriter.writeCharacters("\n");
+                  Tools.writeResultToXml(result, job.reportWriter);
+                  switch(result.getReturnCode()){
+                    case SUCCESS:
+                      job.successDocs++;
+                      break;
+                    case FAIL:
+                      job.errorDocs++;
+                      break;
                   }
                 }
-                job.reportWriter.flush();
-                if(finishedBatch) {
-                  job.setState(JobState.FINISHED);
-                  //close the <documents> element
-                  job.reportWriter.writeCharacters("\n");
-                  job.reportWriter.writeEndElement();
-                  //write the whole batch report element
-                  Tools.writeBatchResultToXml(job, job.reportWriter);
-                  job.reportWriter.close();
-                  // this will be null if no documents needed to be processed
-                  if(job.processor != null) job.processor.dispose();
-                } else {
-                  jobsStillRunning = true;
-                }
-              } catch(XMLStreamException e) {
-                log.error("Can't write to report file for batch " + jobId
-                        + ", shutting down batch", e);
-                job.jobPusher.interrupt();
-                job.setState(JobState.ERROR);
               }
+              job.reportWriter.flush();
+              if(finishedBatch) {
+                job.setState(JobState.FINISHED);
+                //close the <documents> element
+                job.reportWriter.writeCharacters("\n");
+                job.reportWriter.writeEndElement();
+                //write the whole batch report element
+                Tools.writeBatchResultToXml(job, job.reportWriter);
+                job.reportWriter.close();
+                // this will be null if no documents needed to be processed
+                if(job.processor != null) job.processor.dispose();
+              } else {
+                jobsStillRunning = true;
+              }
+            } catch(XMLStreamException e) {
+              log.error("Can't write to report file for batch " + 
job.getBatchId()
+                      + ", shutting down batch", e);
+              job.jobPusher.interrupt();
+              job.setState(JobState.ERROR);
             }
           }
           // if all jobs finished and we should shutdown, then let's shutdown
           if(!jobsStillRunning) {
-            if(shutdownWhenFinished) {
-              shutdown();
-              finished = true;
-            }
+            shutdown();
+            finished = true;
             if(exitWhenFinished) {
               System.exit(0);
             }
@@ -389,9 +388,6 @@
    * @param numThreads
    */
   public BatchRunner(int numThreads) {
-    // private constructor to enforce singleton.
-    runningJobs = Collections
-            .synchronizedMap(new HashMap<String, BatchHandler>());
     // start the executors pool
     // create the executor
     // This is similar to an Executors.newFixedThreadPool, but instead
@@ -402,12 +398,6 @@
             TimeUnit.MILLISECONDS, new AlwaysBlockingSynchronousQueue());
   }
 
-  public void shutdownWhenFinished(boolean flag) {
-    synchronized(this) {
-      this.shutdownWhenFinished = flag;
-    }
-  }
-
   public void exitWhenFinished(boolean flag) {
     synchronized(this) {
       this.exitWhenFinished = flag;
@@ -418,16 +408,6 @@
    * Stops this batch runner in an orderly fashion.
    */
   public void shutdown() {
-    executor.shutdown();
-    while(!executor.isTerminated()) {
-      try {
-        executor.awaitTermination(60L, TimeUnit.SECONDS);
-      } catch(InterruptedException e) {
-        // just re-interrupt ourselves and give up
-        Thread.currentThread().interrupt();
-        return;
-      }
-    }
     long processingFinishedTime = System.currentTimeMillis();
     log.info("Processing finished");
     System.gc();
@@ -438,13 +418,12 @@
     if(duplicationFinishedTime==0) duplicationFinishedTime = 
loadingFinishedTime;
     log.info("Processing time (seconds): 
"+(processingFinishedTime-duplicationFinishedTime)/1000.0);
     log.info("Total time (seconds): 
"+(processingFinishedTime-startTime)/1000.0);
-    
   }
 
   /**
    * Stores data about the currently running batch jobs.
    */
-  private Map<String, BatchHandler> runningJobs;
+  private BatchHandler runningJob;
   /**
    * Executor used to run the tasks.
    */
@@ -454,11 +433,6 @@
    */
   private Thread monitorThread;
   /**
-   * A flag used to signal that the batch runner should shutdown when all
-   * currently running batches have completed.
-   */
-  private boolean shutdownWhenFinished = true;
-  /**
    * A flag used to signal that the batch runner should exit the Java process
    * when all currently running batches have completed.
    */
@@ -481,13 +455,10 @@
     synchronized(this) {
       // record the new batch
       String batchId = batch.getBatchId();
-      if(runningJobs.containsKey(batchId)) { throw new 
IllegalArgumentException(
-              "A batch with the same ID (" + batchId
-                      + ") is already in process!"); }
-      BatchHandler jobHandler = new BatchHandler(batch);
+      runningJob = new BatchHandler(batch);
       // register the batch with JMX
       try {
-        StandardMBean batchMBean = new StandardMBean(jobHandler, 
BatchJobData.class);
+        StandardMBean batchMBean = new StandardMBean(runningJob, 
BatchJobData.class);
         Hashtable<String, String> props = new Hashtable<String, String>();
         props.put("type", "Batch");
         props.put("id", ObjectName.quote(batch.getBatchId()));
@@ -497,9 +468,8 @@
       catch(JMException e) {
         log.warn("Could not register batch with platform MBean server", e);
       }
-      runningJobs.put(batchId, jobHandler);
       // queue the batch for execution
-      jobHandler.start();
+      runningJob.start();
       if(monitorThread == null) {
         // start the thread that monitors the batches, saves the reports, and
         // manages the automatic shutdown at the end of all jobs.
@@ -511,34 +481,6 @@
     }
   }
 
-  /**
-   * Checks if a batch has completed execution.
-   * 
-   * @param batchId
-   * @return <tt>true</tt> iff the batch with the given ID has completed
-   *         execution.
-   */
-  public boolean isFinished(String batchId) {
-    return getBatchData(batchId).getState() == JobState.FINISHED;
-  }
-
-  /**
-   * @param batchId
-   * @return
-   */
-  public BatchJobData getBatchData(String batchId) {
-    return null;
-  }
-
-  /**
-   * Returns a set containing the IDs for all the currently running jobs.
-   * 
-   * @return a {@link Set} of {@link String}s.
-   */
-  public Set<String> getBatchJobIDs() {
-    return new HashSet<String>(runningJobs.keySet());
-  }
-
   static long startTime = System.currentTimeMillis();
   static long loadingFinishedTime;
   static long duplicationFinishedTime;
@@ -839,7 +781,6 @@
         log.info("No documents to process, exiting");
       } else {
         instance.runBatch(aBatch);
-        instance.shutdownWhenFinished(true);
         instance.exitWhenFinished(true);
       }
     } catch(Exception e) {

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
What NetFlow Analyzer can do for you? Monitors network bandwidth and traffic
patterns at an interface-level. Reveals which users, apps, and protocols are 
consuming the most bandwidth. Provides multi-vendor support for NetFlow, 
J-Flow, sFlow and other flows. Make informed decisions using capacity 
planning reports. https://ad.doubleclick.net/ddm/clk/305295220;132659582;e
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to