Revision: 19372
http://sourceforge.net/p/gate/code/19372
Author: ian_roberts
Date: 2016-05-31 16:23:57 +0000 (Tue, 31 May 2016)
Log Message:
-----------
We've never made use of the fact that one BatchRunner could run more than one
batch at the same time, so removed this capability to simplify the code.
Modified Paths:
--------------
gcp/trunk/src/gate/cloud/batch/BatchRunner.java
Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-05-31 16:12:01 UTC
(rev 19371)
+++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-05-31 16:23:57 UTC
(rev 19372)
@@ -11,27 +11,31 @@
*/
package gate.cloud.batch;
+import gate.CorpusController;
import gate.Gate;
import gate.cloud.batch.BatchJobData.JobState;
import gate.cloud.batch.ProcessResult.ReturnCode;
+import gate.cloud.io.DocumentEnumerator;
+import gate.cloud.io.IOConstants;
+import gate.cloud.io.InputHandler;
+import gate.cloud.io.OutputHandler;
+import gate.cloud.io.StreamingInputHandler;
import gate.cloud.util.CLibrary;
import gate.cloud.util.Tools;
import gate.cloud.util.XMLBatchParser;
import gate.creole.ResourceInstantiationException;
import gate.util.GateException;
+import gate.util.persistence.PersistenceManager;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Hashtable;
-import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
@@ -44,28 +48,18 @@
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
-import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Logger;
-
-import com.sun.jna.Platform;
-import gate.CorpusController;
-import gate.cloud.io.DocumentData;
-import gate.cloud.io.DocumentEnumerator;
-import gate.cloud.io.IOConstants;
-import gate.cloud.io.InputHandler;
-import gate.cloud.io.OutputHandler;
-import gate.cloud.io.StreamingInputHandler;
-import gate.util.persistence.PersistenceManager;
-import java.util.LinkedList;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import com.sun.jna.Platform;
+
/**
- * This class is a Batch Runner, i.e. it manages the execution of batch jobs,
- * each of them being specified by a {@link Batch} object. This class is a
- * singleton, so you can only obtain an instance via the getInstance()method.
+ * This class is a Batch Runner, i.e. it manages the execution of a batch job,
+ * specified by a {@link Batch} object.
*/
public class BatchRunner {
private static final Logger log = Logger.getLogger(BatchRunner.class);
@@ -173,6 +167,18 @@
if(Thread.interrupted()) { return; }
}
}
+ // shut down the executor and wait for it to terminate
+ executor.shutdown();
+ while(!executor.isTerminated()) {
+ try {
+ executor.awaitTermination(60L, TimeUnit.SECONDS);
+ } catch(InterruptedException e) {
+ // just re-interrupt ourselves and give up
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ // now we know the batch is finished
resultQueue.add(new EndOfBatchResult());
}
}, "Batch \"" + getBatchId() + "\"-job-pusher");
@@ -307,65 +313,58 @@
long startTime = System.currentTimeMillis();
try {
boolean jobsStillRunning = false;
- // for each job
- Iterator<String> jobsIter = runningJobs.keySet().iterator();
- while(jobsIter.hasNext()) {
- String jobId = jobsIter.next();
- BatchHandler job = runningJobs.get(jobId);
- if(job.getState() == JobState.RUNNING) {
- List<ProcessResult> results = new ArrayList<ProcessResult>();
- int resultsCount = job.resultQueue.drainTo(results);
- boolean finishedBatch = false;
- try {
- for(ProcessResult result : results) {
- if(result.getReturnCode() == ReturnCode.END_OF_BATCH) {
- finishedBatch = true;
- } else {
- long fileSize = result.getOriginalFileSize();
- long docLength = result.getDocumentLength();
- if(fileSize > 0) job.totalBytes += fileSize;
- if(docLength > 0) job.totalChars += docLength;
-
- job.reportWriter.writeCharacters("\n");
- Tools.writeResultToXml(result, job.reportWriter);
- switch(result.getReturnCode()){
- case SUCCESS:
- job.successDocs++;
- break;
- case FAIL:
- job.errorDocs++;
- break;
- }
+ BatchHandler job = runningJob;
+ if(job.getState() == JobState.RUNNING) {
+ List<ProcessResult> results = new ArrayList<ProcessResult>();
+ int resultsCount = job.resultQueue.drainTo(results);
+ boolean finishedBatch = false;
+ try {
+ for(ProcessResult result : results) {
+ if(result.getReturnCode() == ReturnCode.END_OF_BATCH) {
+ finishedBatch = true;
+ } else {
+ long fileSize = result.getOriginalFileSize();
+ long docLength = result.getDocumentLength();
+ if(fileSize > 0) job.totalBytes += fileSize;
+ if(docLength > 0) job.totalChars += docLength;
+
+ job.reportWriter.writeCharacters("\n");
+ Tools.writeResultToXml(result, job.reportWriter);
+ switch(result.getReturnCode()){
+ case SUCCESS:
+ job.successDocs++;
+ break;
+ case FAIL:
+ job.errorDocs++;
+ break;
}
}
- job.reportWriter.flush();
- if(finishedBatch) {
- job.setState(JobState.FINISHED);
- //close the <documents> element
- job.reportWriter.writeCharacters("\n");
- job.reportWriter.writeEndElement();
- //write the whole batch report element
- Tools.writeBatchResultToXml(job, job.reportWriter);
- job.reportWriter.close();
- // this will be null if no documents needed to be processed
- if(job.processor != null) job.processor.dispose();
- } else {
- jobsStillRunning = true;
- }
- } catch(XMLStreamException e) {
- log.error("Can't write to report file for batch " + jobId
- + ", shutting down batch", e);
- job.jobPusher.interrupt();
- job.setState(JobState.ERROR);
}
+ job.reportWriter.flush();
+ if(finishedBatch) {
+ job.setState(JobState.FINISHED);
+ //close the <documents> element
+ job.reportWriter.writeCharacters("\n");
+ job.reportWriter.writeEndElement();
+ //write the whole batch report element
+ Tools.writeBatchResultToXml(job, job.reportWriter);
+ job.reportWriter.close();
+ // this will be null if no documents needed to be processed
+ if(job.processor != null) job.processor.dispose();
+ } else {
+ jobsStillRunning = true;
+ }
+ } catch(XMLStreamException e) {
+ log.error("Can't write to report file for batch " +
job.getBatchId()
+ + ", shutting down batch", e);
+ job.jobPusher.interrupt();
+ job.setState(JobState.ERROR);
}
}
// if all jobs finished and we should shutdown, then let's shutdown
if(!jobsStillRunning) {
- if(shutdownWhenFinished) {
- shutdown();
- finished = true;
- }
+ shutdown();
+ finished = true;
if(exitWhenFinished) {
System.exit(0);
}
@@ -389,9 +388,6 @@
* @param numThreads
*/
public BatchRunner(int numThreads) {
- // private constructor to enforce singleton.
- runningJobs = Collections
- .synchronizedMap(new HashMap<String, BatchHandler>());
// start the executors pool
// create the executor
// This is similar to an Executors.newFixedThreadPool, but instead
@@ -402,12 +398,6 @@
TimeUnit.MILLISECONDS, new AlwaysBlockingSynchronousQueue());
}
- public void shutdownWhenFinished(boolean flag) {
- synchronized(this) {
- this.shutdownWhenFinished = flag;
- }
- }
-
public void exitWhenFinished(boolean flag) {
synchronized(this) {
this.exitWhenFinished = flag;
@@ -418,16 +408,6 @@
* Stops this batch runner in an orderly fashion.
*/
public void shutdown() {
- executor.shutdown();
- while(!executor.isTerminated()) {
- try {
- executor.awaitTermination(60L, TimeUnit.SECONDS);
- } catch(InterruptedException e) {
- // just re-interrupt ourselves and give up
- Thread.currentThread().interrupt();
- return;
- }
- }
long processingFinishedTime = System.currentTimeMillis();
log.info("Processing finished");
System.gc();
@@ -438,13 +418,12 @@
if(duplicationFinishedTime==0) duplicationFinishedTime =
loadingFinishedTime;
log.info("Processing time (seconds):
"+(processingFinishedTime-duplicationFinishedTime)/1000.0);
log.info("Total time (seconds):
"+(processingFinishedTime-startTime)/1000.0);
-
}
/**
* Stores data about the currently running batch jobs.
*/
- private Map<String, BatchHandler> runningJobs;
+ private BatchHandler runningJob;
/**
* Executor used to run the tasks.
*/
@@ -454,11 +433,6 @@
*/
private Thread monitorThread;
/**
- * A flag used to signal that the batch runner should shutdown when all
- * currently running batches have completed.
- */
- private boolean shutdownWhenFinished = true;
- /**
* A flag used to signal that the batch runner should exit the Java process
* when all currently running batches have completed.
*/
@@ -481,13 +455,10 @@
synchronized(this) {
// record the new batch
String batchId = batch.getBatchId();
- if(runningJobs.containsKey(batchId)) { throw new
IllegalArgumentException(
- "A batch with the same ID (" + batchId
- + ") is already in process!"); }
- BatchHandler jobHandler = new BatchHandler(batch);
+ runningJob = new BatchHandler(batch);
// register the batch with JMX
try {
- StandardMBean batchMBean = new StandardMBean(jobHandler,
BatchJobData.class);
+ StandardMBean batchMBean = new StandardMBean(runningJob,
BatchJobData.class);
Hashtable<String, String> props = new Hashtable<String, String>();
props.put("type", "Batch");
props.put("id", ObjectName.quote(batch.getBatchId()));
@@ -497,9 +468,8 @@
catch(JMException e) {
log.warn("Could not register batch with platform MBean server", e);
}
- runningJobs.put(batchId, jobHandler);
// queue the batch for execution
- jobHandler.start();
+ runningJob.start();
if(monitorThread == null) {
// start the thread that monitors the batches, saves the reports, and
// manages the automatic shutdown at the end of all jobs.
@@ -511,34 +481,6 @@
}
}
- /**
- * Checks if a batch has completed execution.
- *
- * @param batchId
- * @return <tt>true</tt> iff the batch with the given ID has completed
- * execution.
- */
- public boolean isFinished(String batchId) {
- return getBatchData(batchId).getState() == JobState.FINISHED;
- }
-
- /**
- * @param batchId
- * @return
- */
- public BatchJobData getBatchData(String batchId) {
- return null;
- }
-
- /**
- * Returns a set containing the IDs for all the currently running jobs.
- *
- * @return a {@link Set} of {@link String}s.
- */
- public Set<String> getBatchJobIDs() {
- return new HashSet<String>(runningJobs.keySet());
- }
-
static long startTime = System.currentTimeMillis();
static long loadingFinishedTime;
static long duplicationFinishedTime;
@@ -839,7 +781,6 @@
log.info("No documents to process, exiting");
} else {
instance.runBatch(aBatch);
- instance.shutdownWhenFinished(true);
instance.exitWhenFinished(true);
}
} catch(Exception e) {
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
What NetFlow Analyzer can do for you? Monitors network bandwidth and traffic
patterns at an interface-level. Reveals which users, apps, and protocols are
consuming the most bandwidth. Provides multi-vendor support for NetFlow,
J-Flow, sFlow and other flows. Make informed decisions using capacity
planning reports. https://ad.doubleclick.net/ddm/clk/305295220;132659582;e
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs