Revision: 19649
http://sourceforge.net/p/gate/code/19649
Author: johann_p
Date: 2016-10-07 18:47:01 +0000 (Fri, 07 Oct 2016)
Log Message:
-----------
Make it so that the option -o for gcp-direct mode is not
required any more. If missing, the documents will not be saved.
This is useful if the pipeline contains a PR that takes care
of the saving or produces other kinds of output.
Modified Paths:
--------------
gcp/trunk/doc/gcp-guide.pdf
gcp/trunk/doc/install-and-run.tex
gcp/trunk/doc/introduction.tex
gcp/trunk/src/gate/cloud/batch/BatchRunner.java
Modified: gcp/trunk/doc/gcp-guide.pdf
===================================================================
(Binary files differ)
Modified: gcp/trunk/doc/install-and-run.tex
===================================================================
--- gcp/trunk/doc/install-and-run.tex 2016-10-07 15:42:09 UTC (rev 19648)
+++ gcp/trunk/doc/install-and-run.tex 2016-10-07 18:47:01 UTC (rev 19649)
@@ -145,8 +145,14 @@
backup and temporary file name patterns and source control metadata -- see
\url{http://ant.apache.org/manual/dirtasks.html#defaultexcludes} for
details).
-\item[-o] the directory in which to place the output files. Each input file
+\item[-o] (optional) the directory in which to place the output files. Each
input file
will generate an output file with the same name in the output directory.
+ If this option is missing, and the option \texttt{-b} is missing as well,
+ the documents are not saved!
+\item[-b] (optional) if this option is specified it can be used to specify
+ a batch file. In that case, the options \texttt{-x, -i, -o, -r, -I} are
+ not required and ignored if specified and the corresponding information is
+ taken from the batch configuration file instead.
\item[-r] (optional) path to the report file for this batch -- if omitted
GCP will use \verb!report.xml! in the current directory.
\item[-ci] the input files are all gzip-compressed
Modified: gcp/trunk/doc/introduction.tex
===================================================================
--- gcp/trunk/doc/introduction.tex 2016-10-07 15:42:09 UTC (rev 19648)
+++ gcp/trunk/doc/introduction.tex 2016-10-07 18:47:01 UTC (rev 19649)
@@ -134,6 +134,10 @@
This section summarises the main changes between releases of GCP
+%% New after 2.6:
+%% option -o for gcp-direct is not required any more, if missing, documents
+%% are not saved.
+
\subsection{2.6 (June 2016)}
This is a minor bugfix release, the main change is that GCP now depends on GATE
Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-10-07 15:42:09 UTC
(rev 19648)
+++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-10-07 18:47:01 UTC
(rev 19649)
@@ -1,14 +1,14 @@
/*
- * BatchRunner.java
- * Copyright (c) 2007-2011, The University of Sheffield.
- *
- * This file is part of GCP (see http://gate.ac.uk/), and is free
- * software, licenced under the GNU Affero General Public License,
- * Version 3, November 2007.
- *
- *
- * $Id$
- */
+* BatchRunner.java
+* Copyright (c) 2007-2011, The University of Sheffield.
+*
+* This file is part of GCP (see http://gate.ac.uk/), and is free
+* software, licenced under the GNU Affero General Public License,
+* Version 3, November 2007.
+*
+*
+* $Id$
+*/
package gate.cloud.batch;
import gate.CorpusController;
@@ -58,405 +58,405 @@
import com.sun.jna.Platform;
/**
- * This class is a Batch Runner, i.e. it manages the execution of a batch job,
- * specified by a {@link Batch} object.
- */
+* This class is a Batch Runner, i.e. it manages the execution of a batch job,
+* specified by a {@link Batch} object.
+*/
public class BatchRunner {
- private static final Logger log = Logger.getLogger(BatchRunner.class);
- //private static final long LOOP_WAIT = 5 * 60 * 1000;
- private static final long LOOP_WAIT = 10 * 1000;
+private static final Logger log = Logger.getLogger(BatchRunner.class);
+//private static final long LOOP_WAIT = 5 * 60 * 1000;
+private static final long LOOP_WAIT = 10 * 1000;
- /**
- * This class manages the execution of a batch job. It also exposes a
- * {@link BatchJobData} interface that provides information about the
- * execution progress.
- */
- private class BatchHandler implements BatchJobData {
- /**
- * The document processor that runs the actual jobs.
- */
- private DocumentProcessor processor;
-
- /**
- * The batch being run.
- */
- private Batch batch;
-
- private int totalDocs;
- private int successDocs;
- private int errorDocs;
- private JobState state;
- private String id;
-
- /**
- * The moment when the execution of this batch started.
- */
- private long startTime;
-
- /**
- * The sum of {@link ProcessResult#getOriginalFileSize()} values for all
- * processed documents.
- */
- private long totalBytes;
-
- /**
- * The sum of {@link ProcessResult#getDocumentLength()} values for all
- * processed documents.
- */
- private long totalChars;
-
- /**
- * The results queue for this batch.
- */
- private BlockingQueue<ProcessResult> resultQueue;
- /**
- * The report file for this batch.
- */
- private File reportFile;
- /**
- * Writer to write the report file.
- */
- private XMLStreamWriter reportWriter;
- /**
- * Thread that pushes jobs into the DocumentProcessor for this batch.
- */
- private Thread jobPusher;
+/**
+* This class manages the execution of a batch job. It also exposes a
+* {@link BatchJobData} interface that provides information about the
+* execution progress.
+*/
+private class BatchHandler implements BatchJobData {
+/**
+* The document processor that runs the actual jobs.
+*/
+private DocumentProcessor processor;
- private BatchHandler(final Batch batch) throws GateException, IOException {
- successDocs = 0;
- errorDocs = 0;
- totalBytes = 0;
- totalChars = 0;
- this.batch = batch;
- id = batch.getBatchId();
- }
+/**
+* The batch being run.
+*/
+private Batch batch;
- public void start() throws IOException, XMLStreamException,
- ResourceInstantiationException {
- reportWriter = batch.getReportWriter();
- // any existing report file has now been processed, so we know
- // the correct number of unprocessed document IDs
- totalDocs = batch.getUnprocessedDocumentIDs() == null ? -1 :
batch.getUnprocessedDocumentIDs().length;
- startTime = System.currentTimeMillis();
- setState(JobState.RUNNING);
- resultQueue = new LinkedBlockingQueue<ProcessResult>();
- if(totalDocs != 0) {
- final InputHandler inputHandler = batch.getInputHandler();
- processor = new PooledDocumentProcessor(executor.getCorePoolSize());
- processor.setController(batch.getGateApplication());
- processor.setExecutor(executor);
- processor.setInputHandler(inputHandler);
- processor.setOutputHandlers(batch.getOutputs());
- processor.setResultQueue(resultQueue);
- processor.init();
- log.info("Duplication finished");
- System.gc();
- log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M");
- log.info("Used memory:
"+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M");
- duplicationFinishedTime = System.currentTimeMillis();
- log.info("Duplication time (seconds):
"+(duplicationFinishedTime-loadingFinishedTime)/1000.0);
- jobPusher = new Thread(new Runnable() {
- public void run() {
- if(batch.getDocumentIDs() == null && inputHandler instanceof
StreamingInputHandler) {
- ((StreamingInputHandler)inputHandler).startBatch(batch);
- processor.processStreaming();
- if(Thread.interrupted()) { return; }
- } else {
- for(DocumentID id : batch.getUnprocessedDocumentIDs()) {
- processor.processDocument(id);
- if(Thread.interrupted()) { return; }
- }
- }
- // shut down the executor and wait for it to terminate
- executor.shutdown();
- while(!executor.isTerminated()) {
- try {
- executor.awaitTermination(60L, TimeUnit.SECONDS);
- } catch(InterruptedException e) {
- // just re-interrupt ourselves and give up
- Thread.currentThread().interrupt();
- }
- }
+private int totalDocs;
+private int successDocs;
+private int errorDocs;
+private JobState state;
+private String id;
- // now we know the batch is finished
- resultQueue.add(new EndOfBatchResult());
- }
- }, "Batch \"" + getBatchId() + "\"-job-pusher");
- jobPusher.start();
+/**
+* The moment when the execution of this batch started.
+*/
+private long startTime;
+
+/**
+* The sum of {@link ProcessResult#getOriginalFileSize()} values for all
+* processed documents.
+*/
+private long totalBytes;
+
+/**
+* The sum of {@link ProcessResult#getDocumentLength()} values for all
+* processed documents.
+*/
+private long totalChars;
+
+/**
+* The results queue for this batch.
+*/
+private BlockingQueue<ProcessResult> resultQueue;
+/**
+* The report file for this batch.
+*/
+private File reportFile;
+/**
+* Writer to write the report file.
+*/
+private XMLStreamWriter reportWriter;
+/**
+* Thread that pushes jobs into the DocumentProcessor for this batch.
+*/
+private Thread jobPusher;
+
+private BatchHandler(final Batch batch) throws GateException, IOException {
+successDocs = 0;
+errorDocs = 0;
+totalBytes = 0;
+totalChars = 0;
+this.batch = batch;
+id = batch.getBatchId();
+}
+
+public void start() throws IOException, XMLStreamException,
+ResourceInstantiationException {
+reportWriter = batch.getReportWriter();
+// any existing report file has now been processed, so we know
+// the correct number of unprocessed document IDs
+totalDocs = batch.getUnprocessedDocumentIDs() == null ? -1 :
batch.getUnprocessedDocumentIDs().length;
+startTime = System.currentTimeMillis();
+setState(JobState.RUNNING);
+resultQueue = new LinkedBlockingQueue<ProcessResult>();
+if(totalDocs != 0) {
+final InputHandler inputHandler = batch.getInputHandler();
+processor = new PooledDocumentProcessor(executor.getCorePoolSize());
+processor.setController(batch.getGateApplication());
+processor.setExecutor(executor);
+processor.setInputHandler(inputHandler);
+processor.setOutputHandlers(batch.getOutputs());
+processor.setResultQueue(resultQueue);
+processor.init();
+log.info("Duplication finished");
+System.gc();
+log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M");
+log.info("Used memory:
"+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M");
+duplicationFinishedTime = System.currentTimeMillis();
+log.info("Duplication time (seconds):
"+(duplicationFinishedTime-loadingFinishedTime)/1000.0);
+jobPusher = new Thread(new Runnable() {
+ public void run() {
+ if(batch.getDocumentIDs() == null && inputHandler instanceof
StreamingInputHandler) {
+ ((StreamingInputHandler)inputHandler).startBatch(batch);
+ processor.processStreaming();
+ if(Thread.interrupted()) { return; }
+ } else {
+ for(DocumentID id : batch.getUnprocessedDocumentIDs()) {
+ processor.processDocument(id);
+ if(Thread.interrupted()) { return; }
}
}
-
- /*
- * (non-Javadoc)
- *
- * @see gate.sam.batch.BatchJobData#getErrorDocumentCount()
- */
- public int getErrorDocumentCount() {
- return errorDocs;
+ // shut down the executor and wait for it to terminate
+ executor.shutdown();
+ while(!executor.isTerminated()) {
+ try {
+ executor.awaitTermination(60L, TimeUnit.SECONDS);
+ } catch(InterruptedException e) {
+ // just re-interrupt ourselves and give up
+ Thread.currentThread().interrupt();
+ }
}
- /*
- * (non-Javadoc)
- *
- * @see gate.sam.batch.BatchJobData#getProcessedDocumentCount()
- */
- public int getProcessedDocumentCount() {
- return errorDocs + successDocs;
- }
+ // now we know the batch is finished
+ resultQueue.add(new EndOfBatchResult());
+ }
+}, "Batch \"" + getBatchId() + "\"-job-pusher");
+jobPusher.start();
+}
+}
- /*
- * (non-Javadoc)
- *
- * @see gate.sam.batch.BatchJobData#getRemainingDocumentCount()
- */
- public int getRemainingDocumentCount() {
- return (totalDocs < 0) ? -1 : totalDocs - errorDocs - successDocs;
- }
+/*
+* (non-Javadoc)
+*
+* @see gate.sam.batch.BatchJobData#getErrorDocumentCount()
+*/
+public int getErrorDocumentCount() {
+return errorDocs;
+}
- /*
- * (non-Javadoc)
- *
- * @see gate.sam.batch.BatchJobData#getSuccessDocumentCount()
- */
- public int getSuccessDocumentCount() {
- return successDocs;
- }
+/*
+* (non-Javadoc)
+*
+* @see gate.sam.batch.BatchJobData#getProcessedDocumentCount()
+*/
+public int getProcessedDocumentCount() {
+return errorDocs + successDocs;
+}
- /*
- * (non-Javadoc)
- *
- * @see gate.sam.batch.BatchJobData#getTotalDocumentCount()
- */
- public int getTotalDocumentCount() {
- return totalDocs;
- }
+/*
+* (non-Javadoc)
+*
+* @see gate.sam.batch.BatchJobData#getRemainingDocumentCount()
+*/
+public int getRemainingDocumentCount() {
+return (totalDocs < 0) ? -1 : totalDocs - errorDocs - successDocs;
+}
- public long getTotalDocumentLength() {
- return totalChars;
- }
+/*
+* (non-Javadoc)
+*
+* @see gate.sam.batch.BatchJobData#getSuccessDocumentCount()
+*/
+public int getSuccessDocumentCount() {
+return successDocs;
+}
- public long getTotalFileSize() {
- return totalBytes;
- }
+/*
+* (non-Javadoc)
+*
+* @see gate.sam.batch.BatchJobData#getTotalDocumentCount()
+*/
+public int getTotalDocumentCount() {
+return totalDocs;
+}
- /*
- * (non-Javadoc)
- *
- * @see gate.sam.batch.BatchJobData#isFinished()
- */
- public JobState getState() {
- return state;
- }
+public long getTotalDocumentLength() {
+return totalChars;
+}
- private void setState(JobState state) {
- this.state = state;
- }
+public long getTotalFileSize() {
+return totalBytes;
+}
- private void setErrorDocumentCount(int newCount) {
- errorDocs = newCount;
- }
+/*
+* (non-Javadoc)
+*
+* @see gate.sam.batch.BatchJobData#isFinished()
+*/
+public JobState getState() {
+return state;
+}
- private void setSuccessDocumentCount(int newCount) {
- successDocs = newCount;
- }
+private void setState(JobState state) {
+this.state = state;
+}
- /* (non-Javadoc)
- * @see gate.sam.batch.BatchJobData#getStartTime()
- */
- public long getStartTime() {
- return startTime;
- }
+private void setErrorDocumentCount(int newCount) {
+errorDocs = newCount;
+}
- /* (non-Javadoc)
- * @see gate.sam.batch.BatchJobData#getBatchId()
- */
- public String getBatchId() {
- return id;
- }
-
-
- }
- /**
- * A SynchronousQueue in which offer delegates to put. ThreadPoolExecutor
uses
- * offer to run a new task. Using put instead means that when all the threads
- * in the pool are occupied, execute will wait for one of them to become
free,
- * rather than failing to submit the task.
- */
- public static class AlwaysBlockingSynchronousQueue
- extends
-
SynchronousQueue<Runnable> {
- /**
- * Yes, I know this technically breaks the contract of BlockingQueue, but
it
- * works for this case.
- */
- public boolean offer(Runnable task) {
- try {
- put(task);
- } catch(InterruptedException e) {
- return false;
+private void setSuccessDocumentCount(int newCount) {
+successDocs = newCount;
+}
+
+/* (non-Javadoc)
+* @see gate.sam.batch.BatchJobData#getStartTime()
+*/
+public long getStartTime() {
+return startTime;
+}
+
+/* (non-Javadoc)
+* @see gate.sam.batch.BatchJobData#getBatchId()
+*/
+public String getBatchId() {
+return id;
+}
+
+
+}
+/**
+* A SynchronousQueue in which offer delegates to put. ThreadPoolExecutor uses
+* offer to run a new task. Using put instead means that when all the threads
+* in the pool are occupied, execute will wait for one of them to become free,
+* rather than failing to submit the task.
+*/
+public static class AlwaysBlockingSynchronousQueue
+ extends
+ SynchronousQueue<Runnable> {
+/**
+* Yes, I know this technically breaks the contract of BlockingQueue, but it
+* works for this case.
+*/
+public boolean offer(Runnable task) {
+try {
+put(task);
+} catch(InterruptedException e) {
+return false;
+}
+return true;
+}
+}
+/**
+* A thread that runs continuously while the batch runner is active. Its role
+* is to monitor the running jobs, collect process results, save the report
+* files for each running batch, and shutdown the batch runner and/or Java
+* process when all the batches have completed (if requested via the
+* {@link BatchRunner#shutdownWhenFinished(boolean)} and
+* {@link BatchRunner#exitWhenFinished(boolean)} methods).
+*/
+private class JobMonitor implements Runnable {
+public void run() {
+boolean finished = false;
+while(!finished) {
+long startTime = System.currentTimeMillis();
+try {
+ boolean jobsStillRunning = false;
+ BatchHandler job = runningJob;
+ if(job.getState() == JobState.RUNNING) {
+ List<ProcessResult> results = new ArrayList<ProcessResult>();
+ int resultsCount = job.resultQueue.drainTo(results);
+ boolean finishedBatch = false;
+ try {
+ for(ProcessResult result : results) {
+ if(result.getReturnCode() == ReturnCode.END_OF_BATCH) {
+ finishedBatch = true;
+ } else {
+ long fileSize = result.getOriginalFileSize();
+ long docLength = result.getDocumentLength();
+ if(fileSize > 0) job.totalBytes += fileSize;
+ if(docLength > 0) job.totalChars += docLength;
+
+ job.reportWriter.writeCharacters("\n");
+ Tools.writeResultToXml(result, job.reportWriter);
+ switch(result.getReturnCode()){
+ case SUCCESS:
+ job.successDocs++;
+ break;
+ case FAIL:
+ job.errorDocs++;
+ break;
+ }
+ }
}
- return true;
+ job.reportWriter.flush();
+ if(finishedBatch) {
+ job.setState(JobState.FINISHED);
+ //close the <documents> element
+ job.reportWriter.writeCharacters("\n");
+ job.reportWriter.writeEndElement();
+ //write the whole batch report element
+ Tools.writeBatchResultToXml(job, job.reportWriter);
+ job.reportWriter.close();
+ // this will be null if no documents needed to be processed
+ if(job.processor != null) job.processor.dispose();
+ } else {
+ jobsStillRunning = true;
+ }
+ } catch(XMLStreamException e) {
+ log.error("Can't write to report file for batch " + job.getBatchId()
+ + ", shutting down batch", e);
+ job.jobPusher.interrupt();
+ job.setState(JobState.ERROR);
}
}
- /**
- * A thread that runs continuously while the batch runner is active. Its role
- * is to monitor the running jobs, collect process results, save the report
- * files for each running batch, and shutdown the batch runner and/or Java
- * process when all the batches have completed (if requested via the
- * {@link BatchRunner#shutdownWhenFinished(boolean)} and
- * {@link BatchRunner#exitWhenFinished(boolean)} methods).
- */
- private class JobMonitor implements Runnable {
- public void run() {
- boolean finished = false;
- while(!finished) {
- long startTime = System.currentTimeMillis();
- try {
- boolean jobsStillRunning = false;
- BatchHandler job = runningJob;
- if(job.getState() == JobState.RUNNING) {
- List<ProcessResult> results = new ArrayList<ProcessResult>();
- int resultsCount = job.resultQueue.drainTo(results);
- boolean finishedBatch = false;
- try {
- for(ProcessResult result : results) {
- if(result.getReturnCode() == ReturnCode.END_OF_BATCH) {
- finishedBatch = true;
- } else {
- long fileSize = result.getOriginalFileSize();
- long docLength = result.getDocumentLength();
- if(fileSize > 0) job.totalBytes += fileSize;
- if(docLength > 0) job.totalChars += docLength;
-
- job.reportWriter.writeCharacters("\n");
- Tools.writeResultToXml(result, job.reportWriter);
- switch(result.getReturnCode()){
- case SUCCESS:
- job.successDocs++;
- break;
- case FAIL:
- job.errorDocs++;
- break;
- }
- }
- }
- job.reportWriter.flush();
- if(finishedBatch) {
- job.setState(JobState.FINISHED);
- //close the <documents> element
- job.reportWriter.writeCharacters("\n");
- job.reportWriter.writeEndElement();
- //write the whole batch report element
- Tools.writeBatchResultToXml(job, job.reportWriter);
- job.reportWriter.close();
- // this will be null if no documents needed to be processed
- if(job.processor != null) job.processor.dispose();
- } else {
- jobsStillRunning = true;
- }
- } catch(XMLStreamException e) {
- log.error("Can't write to report file for batch " +
job.getBatchId()
- + ", shutting down batch", e);
- job.jobPusher.interrupt();
- job.setState(JobState.ERROR);
- }
- }
- // if all jobs finished and we should shutdown, then let's shutdown
- if(!jobsStillRunning) {
- shutdown();
- finished = true;
- if(exitWhenFinished) {
- System.exit(0);
- }
- }
- long remainingSleepTime = LOOP_WAIT
- - (System.currentTimeMillis() - startTime);
- if(!finished && remainingSleepTime > 0)
- Thread.sleep(remainingSleepTime);
- } catch(InterruptedException e) {
- // re-interrupt
- Thread.currentThread().interrupt();
- finished = true;
- }
- }
+ // if all jobs finished and we should shutdown, then let's shutdown
+ if(!jobsStillRunning) {
+ shutdown();
+ finished = true;
+ if(exitWhenFinished) {
+ System.exit(0);
}
}
+ long remainingSleepTime = LOOP_WAIT
+ - (System.currentTimeMillis() - startTime);
+ if(!finished && remainingSleepTime > 0)
+ Thread.sleep(remainingSleepTime);
+} catch(InterruptedException e) {
+ // re-interrupt
+ Thread.currentThread().interrupt();
+ finished = true;
+}
+}
+}
+}
- /**
- * Creates a new BatchRunner, with a given number of threads.
- *
- * @param numThreads
- */
- public BatchRunner(int numThreads) {
- // start the executors pool
- // create the executor
- // This is similar to an Executors.newFixedThreadPool, but instead
- // of an unbounded queue for tasks, we block the caller when they
- // try and execute a task and there are no threads available to run
- // it.
- executor = new ThreadPoolExecutor(numThreads, numThreads, 0L,
- TimeUnit.MILLISECONDS, new AlwaysBlockingSynchronousQueue());
- }
+/**
+* Creates a new BatchRunner, with a given number of threads.
+*
+* @param numThreads
+*/
+public BatchRunner(int numThreads) {
+// start the executors pool
+// create the executor
+// This is similar to an Executors.newFixedThreadPool, but instead
+// of an unbounded queue for tasks, we block the caller when they
+// try and execute a task and there are no threads available to run
+// it.
+executor = new ThreadPoolExecutor(numThreads, numThreads, 0L,
+ TimeUnit.MILLISECONDS, new AlwaysBlockingSynchronousQueue());
+}
- public void exitWhenFinished(boolean flag) {
- synchronized(this) {
- this.exitWhenFinished = flag;
- }
- }
+public void exitWhenFinished(boolean flag) {
+synchronized(this) {
+this.exitWhenFinished = flag;
+}
+}
- /**
- * Stops this batch runner in an orderly fashion.
- */
- public void shutdown() {
- long processingFinishedTime = System.currentTimeMillis();
- log.info("Processing finished");
- System.gc();
- log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M");
- log.info("Used memory:
"+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M");
- // if we did not need to process anything then the duplicationFinishedTime
will not have
- // been set and be 0. In that case, set it to the loadingFinishedTime
- if(duplicationFinishedTime==0) duplicationFinishedTime =
loadingFinishedTime;
- log.info("Processing time (seconds):
"+(processingFinishedTime-duplicationFinishedTime)/1000.0);
- log.info("Total time (seconds):
"+(processingFinishedTime-startTime)/1000.0);
- }
+/**
+* Stops this batch runner in an orderly fashion.
+*/
+public void shutdown() {
+long processingFinishedTime = System.currentTimeMillis();
+log.info("Processing finished");
+System.gc();
+log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M");
+log.info("Used memory:
"+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M");
+// if we did not need to process anything then the duplicationFinishedTime
will not have
+// been set and be 0. In that case, set it to the loadingFinishedTime
+if(duplicationFinishedTime==0) duplicationFinishedTime = loadingFinishedTime;
+log.info("Processing time (seconds):
"+(processingFinishedTime-duplicationFinishedTime)/1000.0);
+log.info("Total time (seconds): "+(processingFinishedTime-startTime)/1000.0);
+}
- /**
- * Stores data about the currently running batch jobs.
- */
- private BatchHandler runningJob;
- /**
- * Executor used to run the tasks.
- */
- private ThreadPoolExecutor executor;
- /**
- * Thread to monitor jobs.
- */
- private Thread monitorThread;
- /**
- * A flag used to signal that the batch runner should exit the Java process
- * when all currently running batches have completed.
- */
- private boolean exitWhenFinished = true;
+/**
+* Stores data about the currently running batch jobs.
+*/
+private BatchHandler runningJob;
+/**
+* Executor used to run the tasks.
+*/
+private ThreadPoolExecutor executor;
+/**
+* Thread to monitor jobs.
+*/
+private Thread monitorThread;
+/**
+* A flag used to signal that the batch runner should exit the Java process
+* when all currently running batches have completed.
+*/
+private boolean exitWhenFinished = true;
- /**
- * Starts executing the batch task specified by the provided parameter.
- *
- * @param batch
- * a {@link Batch} object describing a batch job.
- * @throws IllegalArgumentException
- * if there are problems with the provided batch specification
(e.g.
- * the new batch has the same ID as a previous batch).
- * @throws IOException
- * @throws GateException
- * @throws XMLStreamException
- */
- public void runBatch(Batch batch) throws IllegalArgumentException,
- GateException, IOException, XMLStreamException {
- synchronized(this) {
- // record the new batch
- String batchId = batch.getBatchId();
- runningJob = new BatchHandler(batch);
- // register the batch with JMX
+/**
+* Starts executing the batch task specified by the provided parameter.
+*
+* @param batch
+* a {@link Batch} object describing a batch job.
+* @throws IllegalArgumentException
+* if there are problems with the provided batch specification (e.g.
+* the new batch has the same ID as a previous batch).
+* @throws IOException
+* @throws GateException
+* @throws XMLStreamException
+*/
+public void runBatch(Batch batch) throws IllegalArgumentException,
+ GateException, IOException, XMLStreamException {
+synchronized(this) {
+// record the new batch
+String batchId = batch.getBatchId();
+runningJob = new BatchHandler(batch);
+// register the batch with JMX
try {
StandardMBean batchMBean = new StandardMBean(runningJob,
BatchJobData.class);
Hashtable<String, String> props = new Hashtable<String, String>();
@@ -511,7 +511,7 @@
options.addOption("b","batchFile",true,"Batch file (required, replaces -i,
-o, -x, -r, -I)");
options.addOption("i","inputDirectory",true,"Input directory (required,
unless -b given)");
options.addOption("f","outputFormat",true,"Output format, optional, one of
'xml' or 'finf', default is 'finf'");
- options.addOption("o","outputDirectory",true,"Output directory (requried,
unless -b given)");
+ options.addOption("o","outputDirectory",true,"Output directory (not output
if missing)");
options.addOption("x","executePipeline",true,"Pipeline/application file to
execute (required, unless -b given)");
options.addOption("r","reportFile",true,"Report file (optional, default:
report.xml");
options.addOption("t","numberThreads",true,"Number of threads to use
(required)");
@@ -552,7 +552,7 @@
System.exit(0);
}
if(!line.hasOption('t') ||
- (!line.hasOption('b') && (!line.hasOption('i') ||
!line.hasOption('o') || !line.hasOption('x')))) {
+ (!line.hasOption('b') && (!line.hasOption('i') ||
!line.hasOption('x')))) {
log.error("Required argument missing!");
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp("gcp-direct.sh [options]", options);
@@ -714,39 +714,42 @@
inputHandler.init();
// log.info("Have input handler: "+inputHandler);
aBatch.setInputHandler(inputHandler);
- // set the output Handler
- String outputHandlerClassName = null;
- if(outFormat.equals("finf")) {
- outputHandlerClassName =
"gate.cloud.io.file.FastInfosetOutputHandler";
- } else if(outFormat.equals("xml")) {
- outputHandlerClassName =
"gate.cloud.io.file.GATEStandOffFileOutputHandler";
- }
- configData = new HashMap<String, String>();
- configData.put(IOConstants.PARAM_DOCUMENT_ROOT,
line.getOptionValue('o'));
- String outExt = ".finf";
- if(outFormat.equals("xml")) {
- outExt = ".xml";
- }
- if(line.hasOption("co")) {
- configData.put(IOConstants.PARAM_COMPRESSION,"gzip");
- outExt = outExt + ".gz";
- } else {
- configData.put(IOConstants.PARAM_COMPRESSION,"none");
- }
- configData.put(IOConstants.PARAM_FILE_EXTENSION,outExt);
- configData.put(IOConstants.PARAM_ENCODING, "UTF-8");
- configData.put(IOConstants.PARAM_REPLACE_EXTENSION, "true");
- Class<? extends OutputHandler> ouputHandlerClass =
- Class.forName(outputHandlerClassName, true, Gate.getClassLoader())
+ // set the output Handler, but only if the option -o has been
specified,
+ // otherwise, do not set an output handler (run for side-effects
only)
+ List<OutputHandler> outHandlers = new ArrayList<OutputHandler>();
+ if(line.hasOption('o')) {
+ String outputHandlerClassName = null;
+ if(outFormat.equals("finf")) {
+ outputHandlerClassName =
"gate.cloud.io.file.FastInfosetOutputHandler";
+ } else if(outFormat.equals("xml")) {
+ outputHandlerClassName =
"gate.cloud.io.file.GATEStandOffFileOutputHandler";
+ }
+ configData = new HashMap<String, String>();
+ configData.put(IOConstants.PARAM_DOCUMENT_ROOT,
line.getOptionValue('o'));
+ String outExt = ".finf";
+ if(outFormat.equals("xml")) {
+ outExt = ".xml";
+ }
+ if(line.hasOption("co")) {
+ configData.put(IOConstants.PARAM_COMPRESSION,"gzip");
+ outExt = outExt + ".gz";
+ } else {
+ configData.put(IOConstants.PARAM_COMPRESSION,"none");
+ }
+ configData.put(IOConstants.PARAM_FILE_EXTENSION,outExt);
+ configData.put(IOConstants.PARAM_ENCODING, "UTF-8");
+ configData.put(IOConstants.PARAM_REPLACE_EXTENSION, "true");
+ Class<? extends OutputHandler> ouputHandlerClass =
+ Class.forName(outputHandlerClassName, true, Gate.getClassLoader())
.asSubclass(OutputHandler.class);
- OutputHandler outHandler = ouputHandlerClass.newInstance();
- outHandler.config(configData);
- List<AnnotationSetDefinition> asDefs = new
ArrayList<AnnotationSetDefinition>();
- outHandler.setAnnSetDefinitions(asDefs);
- outHandler.init();
- // log.info("Have output handler: "+outHandler);
- List<OutputHandler> outHandlers = new ArrayList<OutputHandler>();
- outHandlers.add(outHandler);
+ OutputHandler outHandler = ouputHandlerClass.newInstance();
+ outHandler.config(configData);
+ List<AnnotationSetDefinition> asDefs = new
ArrayList<AnnotationSetDefinition>();
+ outHandler.setAnnSetDefinitions(asDefs);
+ outHandler.init();
+ // log.info("Have output handler: "+outHandler);
+ outHandlers.add(outHandler);
+ } // if option -o is given
aBatch.setOutputHandlers(outHandlers);
String enumeratorClassName =
"gate.cloud.io.file.FileDocumentEnumerator";
Class<? extends DocumentEnumerator> enumeratorClass =
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs