Revision: 18202
http://sourceforge.net/p/gate/code/18202
Author: ian_roberts
Date: 2014-07-20 18:55:23 +0000 (Sun, 20 Jul 2014)
Log Message:
-----------
Added the concept of a "streaming" input handler.
Rather than fetching individual documents from some source given a document ID,
streaming handlers operate like an iterator - each time you call nextDocument()
the next available document will be returned. Streaming input handlers do not
require a separate document enumerator or pre-defined list of document IDs, but
they require that all documents are loaded in a single thread - you can't
parallelize the loading of different documents.
The motivating use case for this is the format in which data comes from the
Twitter or DataSift streaming APIs, with one file that contains a continuous
stream of JSON objects one after the other. JSONStreamingInputHandler can load
these kinds of files, treating each JSON object in the stream as a separate
GATE document. For this to work the JSON format has to be one for which a
DocumentFormat implementation exists to unpack the text from the appropriate
property of the JSON.
Modified Paths:
--------------
gcp/trunk/.classpath
gcp/trunk/src/gate/cloud/batch/Batch.java
gcp/trunk/src/gate/cloud/batch/BatchRunner.java
gcp/trunk/src/gate/cloud/batch/DocumentProcessor.java
gcp/trunk/src/gate/cloud/batch/PooledDocumentProcessor.java
gcp/trunk/src/gate/cloud/batch/ProcessResult.java
gcp/trunk/src/gate/cloud/io/IOConstants.java
gcp/trunk/src/gate/cloud/util/Tools.java
gcp/trunk/src/gate/cloud/util/XMLBatchParser.java
Added Paths:
-----------
gcp/trunk/src/gate/cloud/batch/EndOfBatchResult.java
gcp/trunk/src/gate/cloud/io/StreamingInputHandler.java
gcp/trunk/src/gate/cloud/io/json/
gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java
Modified: gcp/trunk/.classpath
===================================================================
--- gcp/trunk/.classpath 2014-07-20 01:20:00 UTC (rev 18201)
+++ gcp/trunk/.classpath 2014-07-20 18:55:23 UTC (rev 18202)
@@ -5,7 +5,6 @@
<classpathentry kind="con"
path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="lib" path="conf"/>
<classpathentry kind="lib" path="lib/mimir-client-5.0.jar"/>
- <classpathentry combineaccessrules="false" kind="src" path="/GATE"/>
<classpathentry kind="con"
path="org.apache.ivyde.eclipse.cpcontainer.IVYDE_CONTAINER/?project=gcp&ivyXmlPath=build%2Fivy.xml&confs=*&ivySettingsPath=%24%7Bworkspace_loc%3Agcp%2Fbuild%2Fivysettings.xml%7D&loadSettingsOnDemand=false&propertyFiles="/>
<classpathentry kind="output" path="classes"/>
</classpath>
Modified: gcp/trunk/src/gate/cloud/batch/Batch.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/Batch.java 2014-07-20 01:20:00 UTC (rev
18201)
+++ gcp/trunk/src/gate/cloud/batch/Batch.java 2014-07-20 18:55:23 UTC (rev
18202)
@@ -141,7 +141,6 @@
if(restarting) {
try {
// in the report XML, the document IDs are only represented as Strings
- Set<String> completedDocuments = new HashSet<String>();
logger.debug("Processing existing report file");
InputStream bakIn =
new BufferedInputStream(new FileInputStream(backupFile));
@@ -211,15 +210,17 @@
bakIn.close();
backupFile.delete();
}
- // filter the documents already processed
- List<DocumentID> unprocessedDocs = new ArrayList<DocumentID>();
- for(DocumentID docId : documentIDs) {
- if(!completedDocuments.contains(docId.getIdText())) {
- unprocessedDocs.add(docId);
+ // filter the documents already processed, if the full list is known
up front
+ if(documentIDs != null) {
+ List<DocumentID> unprocessedDocs = new ArrayList<DocumentID>();
+ for(DocumentID docId : documentIDs) {
+ if(!completedDocuments.contains(docId.getIdText())) {
+ unprocessedDocs.add(docId);
+ }
}
+ unprocessedDocumentIDs = unprocessedDocs.toArray(
+ new DocumentID[unprocessedDocs.size()]);
}
- unprocessedDocumentIDs = unprocessedDocs.toArray(
- new DocumentID[unprocessedDocs.size()]);
} catch(XMLStreamException e) {
throw new GateException("Cannot write to the report file!", e);
} catch(IOException e) {
@@ -236,6 +237,8 @@
private DocumentID[] documentIDs;
private DocumentID[] unprocessedDocumentIDs;
+
+ private Set<String> completedDocuments = new HashSet<String>();
private CorpusController gateApplication;
@@ -295,7 +298,9 @@
}
/**
- * Gets the list of input document IDs in this batch.
+ * Gets the list of input document IDs in this batch. May be null
+ * for streaming batches.
+ *
* @return an array of {@link String}s.
*/
public DocumentID[] getDocumentIDs() {
@@ -372,11 +377,24 @@
* are still to be processed. For a clean batch this would be the
* same as {@link #getDocumentIDs()} but for a batch that has
* been interrupted and restarted the values may be different.
- * @return an array of {@link String}s.
+ * May be null for streaming batches, where the full list of
+ * document IDs is not known up-front.
*/
public DocumentID[] getUnprocessedDocumentIDs() {
return unprocessedDocumentIDs;
}
+
+ /**
+ * This gets the set of all document IDs from this batch that
+ * have been successfully processed previously. For a clean
+ * batch this would be empty, but for a batch that has been
+ * interrupted and restarted the set will contain document
+ * IDs that are marked as SUCCEEDED in the partial report
+ * file from the previous run.
+ */
+ public Set<String> getCompletedDocuments() {
+ return completedDocuments;
+ }
public String toString() {
return "Batch ID: "
Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-20 01:20:00 UTC
(rev 18201)
+++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-20 18:55:23 UTC
(rev 18202)
@@ -13,6 +13,7 @@
import gate.Gate;
import gate.cloud.batch.BatchJobData.JobState;
+import gate.cloud.batch.ProcessResult.ReturnCode;
import gate.cloud.util.CLibrary;
import gate.cloud.util.Tools;
import gate.cloud.util.XMLBatchParser;
@@ -48,10 +49,12 @@
import com.sun.jna.Platform;
import gate.CorpusController;
+import gate.cloud.io.DocumentData;
import gate.cloud.io.DocumentEnumerator;
import gate.cloud.io.IOConstants;
import gate.cloud.io.InputHandler;
import gate.cloud.io.OutputHandler;
+import gate.cloud.io.StreamingInputHandler;
import gate.util.persistence.PersistenceManager;
import java.util.LinkedList;
import org.apache.commons.cli.BasicParser;
@@ -139,15 +142,16 @@
reportWriter = batch.getReportWriter();
// any existing report file has now been processed, so we know
// the correct number of unprocessed document IDs
- totalDocs = batch.getUnprocessedDocumentIDs().length;
+ totalDocs = batch.getUnprocessedDocumentIDs() == null ? -1 :
batch.getUnprocessedDocumentIDs().length;
startTime = System.currentTimeMillis();
setState(JobState.RUNNING);
resultQueue = new LinkedBlockingQueue<ProcessResult>();
- if(totalDocs > 0) {
+ if(totalDocs != 0) {
+ final InputHandler inputHandler = batch.getInputHandler();
processor = new PooledDocumentProcessor(executor.getCorePoolSize());
processor.setController(batch.getGateApplication());
processor.setExecutor(executor);
- processor.setInputHandler(batch.getInputHandler());
+ processor.setInputHandler(inputHandler);
processor.setOutputHandlers(batch.getOutputs());
processor.setResultQueue(resultQueue);
processor.init();
@@ -159,10 +163,17 @@
log.info("Duplication time (seconds):
"+(duplicationFinishedTime-loadingFinishedTime)/1000.0);
jobPusher = new Thread(new Runnable() {
public void run() {
- for(DocumentID id : batch.getUnprocessedDocumentIDs()) {
- processor.processDocument(id);
+ if(batch.getDocumentIDs() == null && inputHandler instanceof
StreamingInputHandler) {
+ ((StreamingInputHandler)inputHandler).startBatch(batch);
+ processor.processStreaming();
if(Thread.interrupted()) { return; }
+ } else {
+ for(DocumentID id : batch.getUnprocessedDocumentIDs()) {
+ processor.processDocument(id);
+ if(Thread.interrupted()) { return; }
+ }
}
+ resultQueue.add(new EndOfBatchResult());
}
}, "Batch \"" + getBatchId() + "\"-job-pusher");
jobPusher.start();
@@ -193,7 +204,7 @@
* @see gate.sam.batch.BatchJobData#getRemainingDocumentCount()
*/
public int getRemainingDocumentCount() {
- return totalDocs - errorDocs - successDocs;
+ return (totalDocs < 0) ? -1 : totalDocs - errorDocs - successDocs;
}
/*
@@ -304,26 +315,31 @@
if(job.getState() == JobState.RUNNING) {
List<ProcessResult> results = new ArrayList<ProcessResult>();
int resultsCount = job.resultQueue.drainTo(results);
+ boolean finishedBatch = false;
try {
for(ProcessResult result : results) {
- long fileSize = result.getOriginalFileSize();
- long docLength = result.getDocumentLength();
- if(fileSize > 0) job.totalBytes += fileSize;
- if(docLength > 0) job.totalChars += docLength;
-
- job.reportWriter.writeCharacters("\n");
- Tools.writeResultToXml(result, job.reportWriter);
- switch(result.getReturnCode()){
- case SUCCESS:
- job.successDocs++;
- break;
- case FAIL:
- job.errorDocs++;
- break;
+ if(result.getReturnCode() == ReturnCode.END_OF_BATCH) {
+ finishedBatch = true;
+ } else {
+ long fileSize = result.getOriginalFileSize();
+ long docLength = result.getDocumentLength();
+ if(fileSize > 0) job.totalBytes += fileSize;
+ if(docLength > 0) job.totalChars += docLength;
+
+ job.reportWriter.writeCharacters("\n");
+ Tools.writeResultToXml(result, job.reportWriter);
+ switch(result.getReturnCode()){
+ case SUCCESS:
+ job.successDocs++;
+ break;
+ case FAIL:
+ job.errorDocs++;
+ break;
+ }
}
}
job.reportWriter.flush();
- if(job.getRemainingDocumentCount() == 0) {
+ if(finishedBatch) {
job.setState(JobState.FINISHED);
//close the <documents> element
job.reportWriter.writeCharacters("\n");
Modified: gcp/trunk/src/gate/cloud/batch/DocumentProcessor.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/DocumentProcessor.java 2014-07-20
01:20:00 UTC (rev 18201)
+++ gcp/trunk/src/gate/cloud/batch/DocumentProcessor.java 2014-07-20
18:55:23 UTC (rev 18202)
@@ -16,8 +16,6 @@
import gate.cloud.io.OutputHandler;
import gate.creole.ResourceInstantiationException;
-import java.io.File;
-import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
@@ -32,15 +30,17 @@
public void setController(CorpusController c);
/**
- * Sets the {@link InputHandler} that should be used to load input documents.
- * @param handler the handler that will provide the GATE documents to be
- * processed.
+ * Sets the {@link InputHandler} that should be used to load input
+ * documents.
+ *
+ * @param handler the handler that will provide the GATE documents to
+ * be processed.
*/
public void setInputHandler(InputHandler handler);
/**
- * Add the given definition to the set of standoff files this processor will
- * output for each document.
+ * Add the given definition to the set of standoff files this
+ * processor will output for each document.
*/
public void setOutputHandlers(List<OutputHandler> outputDefs);
@@ -55,9 +55,10 @@
public void setResultQueue(BlockingQueue<ProcessResult> queue);
/**
- * This method should be called after the processor has been configured but
- * before the first call to {@link #processDocument}, in order to initialise
- * the processor and make it ready to serve requests.
+ * This method should be called after the processor has been
+ * configured but before the first call to {@link #processDocument},
+ * in order to initialise the processor and make it ready to serve
+ * requests.
*/
public void init() throws ResourceInstantiationException;
@@ -68,12 +69,20 @@
public void dispose();
/**
- * Process the document with the given ID. The document will be processed,
- * the resulting content and annotations will be written out to their
- * respective files (according to the configured output definitions), and the
- * result status and statistics will be returned.
+ * Process the document with the given ID. The document will be
+ * processed, the resulting content and annotations will be written
+ * out to their respective files (according to the configured output
+ * definitions), and the result status and statistics will be
+ * returned.
*
* @param docId the ID of the document to process
*/
public void processDocument(DocumentID docId);
+
+ /**
+ * Process the stream of documents from this processor's
+ * StreamingInputHandler. This method will only be called if the input
+ * handler implements that interface.
+ */
+ public void processStreaming();
}
Added: gcp/trunk/src/gate/cloud/batch/EndOfBatchResult.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/EndOfBatchResult.java
(rev 0)
+++ gcp/trunk/src/gate/cloud/batch/EndOfBatchResult.java 2014-07-20
18:55:23 UTC (rev 18202)
@@ -0,0 +1,34 @@
+package gate.cloud.batch;
+
+import java.util.Map;
+
+public class EndOfBatchResult implements ProcessResult {
+
+ public ReturnCode getReturnCode() {
+ return ReturnCode.END_OF_BATCH;
+ }
+
+ public long getExecutionTime() {
+ return 0;
+ }
+
+ public long getOriginalFileSize() {
+ return 0;
+ }
+
+ public long getDocumentLength() {
+ return 0;
+ }
+
+ public Map<String, Integer> getAnnotationCounts() {
+ return null;
+ }
+
+ public DocumentID getDocumentId() {
+ return null;
+ }
+
+ public String getErrorDescription() {
+ return null;
+ }
+}
Property changes on: gcp/trunk/src/gate/cloud/batch/EndOfBatchResult.java
___________________________________________________________________
Added: svn:keywords
## -0,0 +1 ##
+Id
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Modified: gcp/trunk/src/gate/cloud/batch/PooledDocumentProcessor.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/PooledDocumentProcessor.java 2014-07-20
01:20:00 UTC (rev 18201)
+++ gcp/trunk/src/gate/cloud/batch/PooledDocumentProcessor.java 2014-07-20
18:55:23 UTC (rev 18202)
@@ -18,6 +18,7 @@
import gate.cloud.io.DocumentData;
import gate.cloud.io.InputHandler;
import gate.cloud.io.OutputHandler;
+import gate.cloud.io.StreamingInputHandler;
import gate.cloud.util.GateResourcePool;
import gate.creole.ExecutionException;
import gate.creole.ResourceInstantiationException;
@@ -199,6 +200,69 @@
Thread.currentThread().interrupt();
}
}
+
+ /**
+ * Process a stream of documents from a StreamingInputHandler, reporting
+ * success or failure of each document to the result queue.
+ */
+ public void processStreaming() {
+ StreamingInputHandler stream = (StreamingInputHandler)inputHandler;
+ log.info("Processing in streaming mode");
+ DocumentData dd = null;
+ try {
+ while((dd = stream.nextDocument()) != null) {
+ final DocumentData docData = dd;
+ log.debug("Loaded document " + dd.id);
+ final CorpusController controller = appPool.take();
+ if(controller != null) {
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ try {
+ log.debug("processing document " + docData.id);
+ processDocumentWithGATE(docData, controller);
+ log.debug("exporting results for document " + docData.id);
+ exportResults(docData);
+ reportSuccess(docData);
+ log.debug("document " + docData.id + " processed
successfully");
+ }
+ finally {
+ if(docData != null && docData.document != null) {
+ Factory.deleteResource(docData.document);
+ docData.document = null;
+ }
+ }
+ }
+ catch(Exception e) {
+ log.error("Error processing document " + docData.id, e);
+ reportFailure(docData.id, docData, e);
+ }
+ finally {
+ appPool.release(controller);
+ }
+ }
+ };
+
+ try {
+ executor.execute(r);
+ }
+ catch(RejectedExecutionException ree) {
+ log.error("Processing job for document " + docData.id
+ + " could not be executed", ree);
+ // if the executor refused the task, release the controller
+ // here, otherwise let the task release it
+ appPool.release(controller);
+ }
+ }
+ }
+ }
+ catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch(Exception e) {
+ log.error("Error processing document " + dd.id, e);
+ reportFailure(dd.id, dd, e);
+ }
+ }
/**
* Process the given document with a GATE application from the pool.
Modified: gcp/trunk/src/gate/cloud/batch/ProcessResult.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/ProcessResult.java 2014-07-20 01:20:00 UTC
(rev 18201)
+++ gcp/trunk/src/gate/cloud/batch/ProcessResult.java 2014-07-20 18:55:23 UTC
(rev 18202)
@@ -22,7 +22,7 @@
/**
* An enumeration of possible return codes.
*/
- public enum ReturnCode {SUCCESS, FAIL};
+ public enum ReturnCode {SUCCESS, FAIL, END_OF_BATCH};
/**
* Gets the return code of a process.
Modified: gcp/trunk/src/gate/cloud/io/IOConstants.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/IOConstants.java 2014-07-20 01:20:00 UTC
(rev 18201)
+++ gcp/trunk/src/gate/cloud/io/IOConstants.java 2014-07-20 18:55:23 UTC
(rev 18202)
@@ -151,6 +151,12 @@
* current platform default encoding.
*/
public static final String PARAM_FILE_NAME_ENCODING = "fileNameEncoding";
+
+ /**
+ * JSON Pointer expression defining where to find the document ID in a
+ * streamed JSON object.
+ */
+ public static final String PARAM_ID_POINTER = "idPointer";
/**
* XML namespace used for all elements in a batch definition XML file.
Added: gcp/trunk/src/gate/cloud/io/StreamingInputHandler.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/StreamingInputHandler.java
(rev 0)
+++ gcp/trunk/src/gate/cloud/io/StreamingInputHandler.java 2014-07-20
18:55:23 UTC (rev 18202)
@@ -0,0 +1,47 @@
+/*
+ * StreamingInputHandler.java
+ * Copyright (c) 2007-2014, The University of Sheffield.
+ *
+ * This file is part of GCP (see http://gate.ac.uk/), and is free
+ * software, licenced under the GNU Affero General Public License,
+ * Version 3, November 2007.
+ *
+ *
+ * $Id$
+ */
+package gate.cloud.io;
+
+import gate.cloud.batch.Batch;
+import gate.util.GateException;
+
+import java.io.IOException;
+
+/**
+ * Input handler that operates in "streaming mode" where the list of
+ * document IDs is not known up-front. Instead of fetching a specific
+ * document by ID the handler simply returns the next available document
+ * each time {@link #nextDocument()} is called.
+ *
+ * @author ian
+ *
+ */
+public interface StreamingInputHandler extends InputHandler {
+
+ /**
+ * Called just before GCP starts requesting documents from the
+ * handler.
+ *
+ * @param batch the batch that is about to start.
+ */
+ public void startBatch(Batch batch);
+
+ /**
+ * Load and return the next available document for this handler, or
+ * <code>null</code> if there are no more documents to process. This
+ * method does not need to be thread safe - it will only be called
+ * from one thread at a time.
+ *
+ * @return the loaded document, which must include a suitable ID
+ */
+ public DocumentData nextDocument() throws IOException, GateException;
+}
Property changes on: gcp/trunk/src/gate/cloud/io/StreamingInputHandler.java
___________________________________________________________________
Added: svn:keywords
## -0,0 +1 ##
+Id
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java
(rev 0)
+++ gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java
2014-07-20 18:55:23 UTC (rev 18202)
@@ -0,0 +1,284 @@
+/*
+ * JSONStreamingInputHandler.java
+ * Copyright (c) 2007-2014, The University of Sheffield.
+ *
+ * This file is part of GCP (see http://gate.ac.uk/), and is free
+ * software, licenced under the GNU Affero General Public License,
+ * Version 3, November 2007.
+ *
+ *
+ * $Id$
+ */
+package gate.cloud.io.json;
+
+import static gate.cloud.io.IOConstants.PARAM_BATCH_FILE_LOCATION;
+import static gate.cloud.io.IOConstants.PARAM_COMPRESSION;
+import static gate.cloud.io.IOConstants.PARAM_ID_POINTER;
+import static gate.cloud.io.IOConstants.PARAM_MIME_TYPE;
+import static gate.cloud.io.IOConstants.PARAM_SOURCE_FILE_LOCATION;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_GZIP;
+import gate.Document;
+import gate.Factory;
+import gate.FeatureMap;
+import gate.cloud.batch.Batch;
+import gate.cloud.batch.DocumentID;
+import gate.cloud.io.DocumentData;
+import gate.cloud.io.IOConstants;
+import gate.cloud.io.StreamingInputHandler;
+import gate.util.GateException;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.ProcessBuilder.Redirect;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.log4j.Logger;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonParser.Feature;
+import com.fasterxml.jackson.core.JsonPointer;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * <p>
+ * Streaming-mode input handler that reads from a file containing either
+ * a top-level JSON array containing a number of objects, or a sequence
+ * of top-level JSON objects concatenated together. This is the typical
+ * format returned by social media streams from Twitter or DataSift.
+ * Each JSON object will be treated as a GATE document, with the
+ * document ID taken from a property in the JSON (the path to the ID
+ * property is a configuration option). Objects for which the ID cannot
+ * be found will be ignored.
+ * </p>
+ * <p>
+ * The input file may be compressed. If the "compression" option is set
+ * to "gzip" then the file will be unpacked using Java's native GZIP
+ * support. Any other value of "compression" will be treated as a
+ * command line to a program that expects the file name as its final
+ * parameter and will produce uncompressed output on its standard out.
+ * The command will be split into words at whitespace, so embedded
+ * whitespace within a single word is not permitted. For example, to
+ * handle JSON files compressed in LZO format use
+ * <code>compression="lzop -dc"</code>.
+ * </p>
+ * <p>
+ * Example for Twitter streaming API:
+ * </p>
+ *
+ * <pre>
+ * <input class="gate.cloud.io.json.JSONStreamingInputHandler"
+ * srcFile="tweets.lzo"
+ * compression="lzop -dc"
+ * mimeType="text/x-json-twitter"
+ * idPointer="/id_str" />
+ * </pre>
+ *
+ * <p>
+ * Example for DataSift:
+ * </p>
+ *
+ * <pre>
+ * <input class="gate.cloud.io.json.JSONStreamingInputHandler"
+ * srcFile="interactions.gz"
+ * compression="gzip"
+ * mimeType="text/x-json-datasift"
+ * idPointer="/interaction/id" />
+ * </pre>
+ *
+ * @author Ian Roberts
+ *
+ */
+public class JSONStreamingInputHandler implements StreamingInputHandler {
+
+ private static Logger logger = Logger
+ .getLogger(JSONStreamingInputHandler.class);
+
+ public DocumentData getInputDocument(DocumentID id) throws IOException,
+ GateException {
+ throw new UnsupportedOperationException(
+ "JSONStreamingInputHandler can only operate in streaming mode");
+ }
+
+ /**
+ * Base directory of the batch.
+ */
+ protected File batchDir;
+
+ /**
+ * The source file from which the JSON objects will be streamed.
+ */
+ protected File srcFile;
+
+ /**
+ * Compression applied to the input file. This can be
+ * {@link IOConstants#VALUE_COMPRESSION_GZIP} in which case the file
+ * will be unpacked using Java's native GZIP support. Any other value
+ * is assumed to be a command line to an external command that can
+ * accept an additional parameter giving the path to the file and
+ * produce the uncompressed data on its standard output, e.g.
+ * "lzop -dc" for .lzo compression.
+ */
+ protected String compression;
+
+ /**
+ * The mime type used when loading documents.
+ */
+ protected String mimeType;
+
+ /**
+ * JSON Pointer expression to find the item in the JSON node tree that
+ * represents the document identifier. E.g. "/id_str" for Twitter JSON
+ * or "/interaction/id" for DataSift.
+ */
+ protected JsonPointer idPointer;
+
+ /**
+ * Document IDs that are already complete after a previous run of this
+ * batch.
+ */
+ protected Set<String> completedDocuments;
+
+ /**
+ * External decompression process, if applicable.
+ */
+ protected Process decompressProcess = null;
+
+ protected ObjectMapper objectMapper;
+
+ protected JsonParser jsonParser;
+
+ protected MappingIterator<JsonNode> docIterator;
+
+ public void config(Map<String, String> configData) throws IOException,
+ GateException {
+ // srcFile
+ String srcFileStr = configData.get(PARAM_SOURCE_FILE_LOCATION);
+ if(srcFileStr == null) {
+ throw new IllegalArgumentException("Parameter "
+ + PARAM_SOURCE_FILE_LOCATION + " is required");
+ } else {
+ String batchFileStr = configData.get(PARAM_BATCH_FILE_LOCATION);
+ if(batchFileStr != null) {
+ batchDir = new File(batchFileStr).getParentFile();
+ }
+ srcFile = new File(srcFileStr);
+ if(!srcFile.isAbsolute()) {
+ srcFile = new File(batchDir, srcFileStr);
+ }
+ if(!srcFile.exists()) {
+ throw new IllegalArgumentException("File \"" + srcFile
+ + "\", provided as value for required parameter \""
+ + PARAM_SOURCE_FILE_LOCATION + "\", does not exist!");
+ }
+ if(!srcFile.isFile()) {
+ throw new IllegalArgumentException("File \"" + srcFile
+ + "\", provided as value for required parameter \""
+ + PARAM_SOURCE_FILE_LOCATION + "\", is not a file!");
+ }
+ }
+
+ // idPointer
+ String idPointerStr = configData.get(PARAM_ID_POINTER);
+ if(idPointerStr == null) {
+ throw new IllegalArgumentException("Parameter " + PARAM_ID_POINTER
+ + " is required");
+ }
+ idPointer = JsonPointer.compile(idPointerStr);
+
+ // compression
+ compression = configData.get(PARAM_COMPRESSION);
+ // mime type
+ mimeType = configData.get(PARAM_MIME_TYPE);
+ }
+
+ public void startBatch(Batch b) {
+ completedDocuments = b.getCompletedDocuments();
+ }
+
+ public void init() throws IOException, GateException {
+ InputStream inputStream = null;
+ if(compression == null) {
+ inputStream = new FileInputStream(srcFile);
+ } else if(VALUE_COMPRESSION_GZIP.equals(compression)) {
+ inputStream = new GZIPInputStream(new FileInputStream(srcFile));
+ } else {
+ // treat compression value as a command line
+ List<String> cmdLine =
+ new ArrayList<String>(Arrays.asList(compression.trim().split(
+ "\\s+")));
+ cmdLine.add(srcFile.getAbsolutePath());
+ ProcessBuilder pb = new ProcessBuilder(cmdLine);
+ pb.directory(batchDir);
+ pb.redirectError(Redirect.INHERIT);
+ pb.redirectOutput(Redirect.PIPE);
+ decompressProcess = pb.start();
+ inputStream = decompressProcess.getInputStream();
+ }
+
+ objectMapper = new ObjectMapper();
+ jsonParser =
+ objectMapper.getFactory().createParser(inputStream)
+ .enable(Feature.AUTO_CLOSE_SOURCE);
+ // If the first token in the stream is the start of an array ("[")
+ // then
+ // assume the stream as a whole is an array of objects, one per
+ // document.
+ // To handle this, simply clear the token - The MappingIterator
+ // returned
+ // by readValues will cope with the rest in either form.
+ if(jsonParser.nextToken() == JsonToken.START_ARRAY) {
+ jsonParser.clearCurrentToken();
+ }
+ docIterator = objectMapper.readValues(jsonParser, JsonNode.class);
+ }
+
+ public void close() throws IOException, GateException {
+ docIterator.close();
+ jsonParser.close();
+ if(decompressProcess != null) {
+ try {
+ decompressProcess.waitFor();
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public DocumentData nextDocument() throws IOException, GateException {
+ while(docIterator.hasNextValue()) {
+ JsonNode json = docIterator.nextValue();
+ String id = json.at(idPointer).asText();
+ if(id == null || "".equals(id)) {
+ // can't find an ID, assume this is a "delete" or similar and
+ // ignore it
+ if(logger.isDebugEnabled()) {
+ logger.debug("No ID found in JSON object " + json + " - ignored");
+ }
+ } else {
+ DocumentID docId = new DocumentID(id);
+ FeatureMap docParams = Factory.newFeatureMap();
+ docParams.put(Document.DOCUMENT_STRING_CONTENT_PARAMETER_NAME,
+ json.toString());
+ if(mimeType != null) {
+ docParams.put(Document.DOCUMENT_MIME_TYPE_PARAMETER_NAME, mimeType);
+ }
+ Document gateDoc =
+ (Document)Factory.createResource("gate.corpora.DocumentImpl",
+ docParams, Factory.newFeatureMap(), id);
+ return new DocumentData(gateDoc, docId);
+ }
+ }
+ return null;
+ }
+
+}
Property changes on:
gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java
___________________________________________________________________
Added: svn:keywords
## -0,0 +1 ##
+Id
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Modified: gcp/trunk/src/gate/cloud/util/Tools.java
===================================================================
--- gcp/trunk/src/gate/cloud/util/Tools.java 2014-07-20 01:20:00 UTC (rev
18201)
+++ gcp/trunk/src/gate/cloud/util/Tools.java 2014-07-20 18:55:23 UTC (rev
18202)
@@ -45,7 +45,7 @@
public static void writeResultToXml(ProcessResult result,
XMLStreamWriter writer) throws XMLStreamException {
writer.writeStartElement(Tools.REPORT_NAMESPACE, "processResult");
- writer.writeAttribute("id", result.getDocumentId().toString());
+ writer.writeAttribute("id", result.getDocumentId().getIdText());
writer.writeAttribute("returnCode",
String.valueOf(result.getReturnCode()));
//write the size of the file
if(result.getOriginalFileSize() >= 0) {
Modified: gcp/trunk/src/gate/cloud/util/XMLBatchParser.java
===================================================================
--- gcp/trunk/src/gate/cloud/util/XMLBatchParser.java 2014-07-20 01:20:00 UTC
(rev 18201)
+++ gcp/trunk/src/gate/cloud/util/XMLBatchParser.java 2014-07-20 18:55:23 UTC
(rev 18202)
@@ -178,18 +178,21 @@
}
batch.setOutputHandlers(outputHandlers);
- List<DocumentID> docIds = new LinkedList<DocumentID>();
- for(Object item : docIDsOrSpecs) {
- if(item instanceof DocumentID) {
- docIds.add((DocumentID)item);
- } else if(item instanceof HandlerSpec) {
- DocumentEnumerator enumerator =
((HandlerSpec)item).toDocumentEnumerator();
- while(enumerator.hasNext()) docIds.add(enumerator.next());
+ // if no doc IDs or enumerators then assume streaming mode
+ if(docIDsOrSpecs != null && docIDsOrSpecs.size() > 0) {
+ List<DocumentID> docIds = new LinkedList<DocumentID>();
+ for(Object item : docIDsOrSpecs) {
+ if(item instanceof DocumentID) {
+ docIds.add((DocumentID)item);
+ } else if(item instanceof HandlerSpec) {
+ DocumentEnumerator enumerator =
((HandlerSpec)item).toDocumentEnumerator();
+ while(enumerator.hasNext()) docIds.add(enumerator.next());
+ }
}
+
+ batch.setDocumentIDs(docIds.toArray(new DocumentID[docIds.size()]));
}
- batch.setDocumentIDs(docIds.toArray(new DocumentID[docIds.size()]));
-
// check the batch got all the data it needed
batch.init();
return batch;
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Want fast and easy access to all the code in your enterprise? Index and
search up to 200,000 lines of code with a free copy of Black Duck
Code Sight - the same software that powers the world's largest code
search on Ohloh, the Black Duck Open Hub! Try it now.
http://p.sf.net/sfu/bds
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs