Revision: 18208
http://sourceforge.net/p/gate/code/18208
Author: ian_roberts
Date: 2014-07-23 12:50:37 +0000 (Wed, 23 Jul 2014)
Log Message:
-----------
Output handler that writes JSON objects concatenated to a single file, Twitter
stream style, rather than one file per document.
Modified Paths:
--------------
gcp/trunk/src/gate/cloud/io/IOConstants.java
gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java
gcp/trunk/src/gate/cloud/util/Tools.java
Added Paths:
-----------
gcp/trunk/src/gate/cloud/io/json/JSONStreamingOutputHandler.java
Modified: gcp/trunk/src/gate/cloud/io/IOConstants.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/IOConstants.java 2014-07-23 01:21:04 UTC
(rev 18207)
+++ gcp/trunk/src/gate/cloud/io/IOConstants.java 2014-07-23 12:50:37 UTC
(rev 18208)
@@ -157,6 +157,11 @@
* streamed JSON object.
*/
public static final String PARAM_ID_POINTER = "idPointer";
+
+ /**
+ * Target size for a single output file from a streaming output handler.
+ */
+ public static final String PARAM_CHUNK_SIZE = "chunkSize";
/**
* XML namespace used for all elements in a batch definition XML file.
Modified: gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java
2014-07-23 01:21:04 UTC (rev 18207)
+++ gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java
2014-07-23 12:50:37 UTC (rev 18208)
@@ -64,9 +64,9 @@
* The input file may be compressed. If the "compression" option is set
* to "gzip" then the file will be unpacked using Java's native GZIP
* support. Any other value of "compression" will be treated as a
- * command line to a program that expects the file name as its final
- * parameter and will produce uncompressed output on its standard out.
- * The command will be split into words at whitespace, so embedded
+ * command line to a program that expects the compressed data on its
+ * standard input and will produce uncompressed output on its standard
+ * out. The command will be split into words at whitespace, so embedded
* whitespace within a single word is not permitted. For example, to
* handle JSON files compressed in LZO format use
* <code>compression="lzop -dc"</code>.
@@ -203,6 +203,10 @@
public void startBatch(Batch b) {
completedDocuments = b.getCompletedDocuments();
+ if(completedDocuments != null && completedDocuments.size() > 0) {
+ logger.info("Restarting failed batch - " + completedDocuments.size()
+ + " documents already processed");
+ }
}
public void init() throws IOException, GateException {
@@ -213,14 +217,11 @@
inputStream = new GZIPInputStream(new FileInputStream(srcFile));
} else {
// treat compression value as a command line
- List<String> cmdLine =
- new ArrayList<String>(Arrays.asList(compression.trim().split(
- "\\s+")));
- cmdLine.add(srcFile.getAbsolutePath());
- ProcessBuilder pb = new ProcessBuilder(cmdLine);
+ ProcessBuilder pb = new ProcessBuilder(compression.trim().split("\\s+"));
pb.directory(batchDir);
pb.redirectError(Redirect.INHERIT);
pb.redirectOutput(Redirect.PIPE);
+ pb.redirectInput(srcFile);
decompressProcess = pb.start();
inputStream = decompressProcess.getInputStream();
}
@@ -264,6 +265,8 @@
if(logger.isDebugEnabled()) {
logger.debug("No ID found in JSON object " + json + " - ignored");
}
+ } else if(completedDocuments.contains(id)) {
+ // already processed, ignore
} else {
DocumentID docId = new DocumentID(id);
FeatureMap docParams = Factory.newFeatureMap();
Added: gcp/trunk/src/gate/cloud/io/json/JSONStreamingOutputHandler.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/json/JSONStreamingOutputHandler.java
(rev 0)
+++ gcp/trunk/src/gate/cloud/io/json/JSONStreamingOutputHandler.java
2014-07-23 12:50:37 UTC (rev 18208)
@@ -0,0 +1,224 @@
+/*
+ * JSONStreamingOutputHandler.java
+ * Copyright (c) 2007-2014, The University of Sheffield.
+ *
+ * This file is part of GCP (see http://gate.ac.uk/), and is free
+ * software, licenced under the GNU Affero General Public License,
+ * Version 3, November 2007.
+ *
+ *
+ * $Id$
+ */
+package gate.cloud.io.json;
+
+import static gate.cloud.io.IOConstants.PARAM_BATCH_FILE_LOCATION;
+import static gate.cloud.io.IOConstants.PARAM_CHUNK_SIZE;
+import static gate.cloud.io.IOConstants.PARAM_PATTERN;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_GZIP;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_NONE;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.ProcessBuilder.Redirect;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.log4j.Logger;
+
+import gate.Document;
+import gate.cloud.batch.DocumentID;
+import gate.cloud.io.file.JSONOutputHandler;
+import gate.util.GateException;
+
+/**
+ * JSON output handler that concatenates JSON objects into a single
+ * large file rather than putting each one in its own individual file
+ * named for the document ID.
+ *
+ * @author Ian Roberts
+ *
+ */
+public class JSONStreamingOutputHandler extends JSONOutputHandler {
+
+ private static final Logger logger = Logger
+ .getLogger(JSONStreamingOutputHandler.class);
+
+ protected String pattern;
+
+ protected long chunkSize = -1L;
+
+ protected File batchDir;
+
+ private static final byte[] END_OF_DATA = new byte[0];
+
+ @Override
+ protected void configImpl(Map<String, String> configData) throws IOException,
+ GateException {
+ // TODO Auto-generated method stub
+ super.configImpl(configData);
+ String batchFileStr = configData.get(PARAM_BATCH_FILE_LOCATION);
+ if(batchFileStr != null) {
+ batchDir = new File(batchFileStr).getParentFile();
+ }
+ pattern = configData.get(PARAM_PATTERN);
+ if(pattern == null) {
+ pattern = "part-%03d";
+ }
+ String chunkSizeStr = configData.get(PARAM_CHUNK_SIZE);
+ try {
+ chunkSize = Long.parseLong(chunkSizeStr);
+ } catch(Exception e) {
+ logger.info("Using default chunk size");
+ chunkSize = 99000000;
+ }
+ }
+
+ protected ExecutorService processWaiter = Executors.newCachedThreadPool();
+
+ protected ThreadLocal<ByteArrayOutputStream> baos =
+ new ThreadLocal<ByteArrayOutputStream>() {
+ protected ByteArrayOutputStream initialValue() {
+ return new ByteArrayOutputStream();
+ }
+ };
+
+ protected BlockingQueue<byte[]> results = new
ArrayBlockingQueue<byte[]>(100);
+
+ @Override
+ protected void outputDocumentImpl(Document document, DocumentID documentId)
+ throws IOException, GateException {
+ super.outputDocumentImpl(document, documentId);
+ baos.get().write('\n');
+ byte[] result = baos.get().toByteArray();
+ baos.get().reset();
+ try {
+ results.put(result);
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ protected OutputStream getFileOutputStream(DocumentID docId)
+ throws IOException {
+ return baos.get();
+ }
+
+ @Override
+ public void init() throws IOException, GateException {
+ // TODO Auto-generated method stub
+ new Thread(new StreamOutputter()).start();
+ }
+
+ @Override
+ public void close() throws IOException, GateException {
+ try {
+ results.put(END_OF_DATA);
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ protected class StreamOutputter implements Runnable {
+ private File currentFile;
+
+ private int currentChunk = -1;
+
+ private OutputStream currentOutput;
+
+ private Process currentProcess;
+
+ public void run() {
+ byte[] item = null;
+ try {
+ try {
+ int bytesSinceLastCheck = 0;
+ while((item = results.take()) != END_OF_DATA) {
+ if(currentOutput == null) {
+ try {
+ openNextChunk();
+ } catch(IOException e) {
+ logger.error("Failed to open output file", e);
+
+ }
+ }
+ try {
+ currentOutput.write(item);
+ currentOutput.flush();
+ } catch(IOException e) {
+ logger.warn("Error writing to " + currentFile.getAbsolutePath(),
e);
+ }
+ bytesSinceLastCheck += item.length;
+ if(bytesSinceLastCheck > 1024 * 1024) {
+ if(currentFile.length() > chunkSize) {
+ closeChunk();
+ }
+ bytesSinceLastCheck = 0;
+ }
+ }
+ } finally {
+ closeChunk();
+ processWaiter.shutdown();
+ }
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void closeChunk() {
+ if(currentOutput != null) {
+ try {
+ currentOutput.close();
+ } catch(IOException e) {
+ logger.warn("Error closing file " + currentFile.getAbsolutePath(),
e);
+ }
+ if(currentProcess != null) {
+ final Process p = currentProcess;
+ processWaiter.execute(new Runnable() {
+ public void run() {
+ try {
+ p.waitFor();
+ } catch(InterruptedException e) {
+ logger.warn("Interrupted while waiting for process", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ });
+ currentProcess = null;
+ }
+ currentOutput = null;
+ currentFile = null;
+ }
+ }
+
+ private void openNextChunk() throws IOException {
+ // if we're restarting we might have to skip some batches
+ do {
+ String newFileName = String.format(pattern, ++currentChunk);
+ currentFile = namingStrategy.toFile(new DocumentID(newFileName));
+ } while(currentFile.exists());
+ if(VALUE_COMPRESSION_GZIP.equals(compression)) {
+ currentOutput = new GZIPOutputStream(new
FileOutputStream(currentFile));
+ } else if(compression == null ||
VALUE_COMPRESSION_NONE.equals(compression)) {
+ currentOutput = new FileOutputStream(currentFile);
+ } else {
+ // treat compression value as a command line
+ ProcessBuilder pb = new
ProcessBuilder(compression.trim().split("\\s+"));
+ pb.directory();
+ pb.redirectInput(Redirect.PIPE);
+ pb.redirectOutput(currentFile);
+ pb.redirectError(Redirect.INHERIT);
+ currentProcess = pb.start();
+ currentOutput = currentProcess.getOutputStream();
+ }
+ }
+ }
+
+}
Property changes on:
gcp/trunk/src/gate/cloud/io/json/JSONStreamingOutputHandler.java
___________________________________________________________________
Added: svn:keywords
## -0,0 +1 ##
+Id
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Modified: gcp/trunk/src/gate/cloud/util/Tools.java
===================================================================
--- gcp/trunk/src/gate/cloud/util/Tools.java 2014-07-23 01:21:04 UTC (rev
18207)
+++ gcp/trunk/src/gate/cloud/util/Tools.java 2014-07-23 12:50:37 UTC (rev
18208)
@@ -105,7 +105,13 @@
writer.writeEndElement(); writer.writeCharacters("\n ");
writer.writeStartElement(Tools.REPORT_NAMESPACE, "totalDocuments");
- writer.writeCharacters(Integer.toString(jobData.getTotalDocumentCount()));
+ int totalDocs = jobData.getTotalDocumentCount();
+ if(totalDocs < 0) {
+ // streaming mode, so we don't know totaldocs up front, calculate it
+ // from success and error
+ totalDocs = jobData.getSuccessDocumentCount() +
jobData.getErrorDocumentCount();
+ }
+ writer.writeCharacters(Integer.toString(totalDocs));
writer.writeEndElement(); writer.writeCharacters("\n ");
writer.writeStartElement(Tools.REPORT_NAMESPACE, "successfullyProcessed");
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Want fast and easy access to all the code in your enterprise? Index and
search up to 200,000 lines of code with a free copy of Black Duck
Code Sight - the same software that powers the world's largest code
search on Ohloh, the Black Duck Open Hub! Try it now.
http://p.sf.net/sfu/bds
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs